diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b2e78fac2c..a7acfa87513 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,10 @@ The `IX_WORKFLOW_STORE_ENTRY_WS` index is removed from `WORKFLOW_STORE_ENTRY`. The index had low cardinality and workflow pickup is faster without it. Migration time depends on workflow store size, but should be very fast for most installations. Terminal workflows are removed from the workflow store, so only running workflows contribute to the cost. +### AWS ECR Remote Hashing Support + +Cromwell now supports remote hashing for both public and private AWS ECR containers. + ## 87 Release Notes ### GCP Batch diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 4d5f280079a..c1bfdb168aa 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -416,6 +416,13 @@ docker { max-retries = 3 // Supported registries (Docker Hub, Google, Quay) can have additional configuration set separately + aws { + throttle { + number-of-requests = 1000 + per = 100 seconds + } + num-threads = 10 + } azure { // Worst case `ReadOps per minute` value from official docs // https://github.com/MicrosoftDocs/azure-docs/blob/main/includes/container-registry-limits.md diff --git a/dockerHashing/src/main/scala/cromwell/docker/DockerImageIdentifier.scala b/dockerHashing/src/main/scala/cromwell/docker/DockerImageIdentifier.scala index 7c20760b644..8432fb85e56 100644 --- a/dockerHashing/src/main/scala/cromwell/docker/DockerImageIdentifier.scala +++ b/dockerHashing/src/main/scala/cromwell/docker/DockerImageIdentifier.scala @@ -1,5 +1,6 @@ package cromwell.docker +import cromwell.docker.registryv2.flows.aws.AwsElasticContainerRegistry.isEcr import cromwell.docker.registryv2.flows.azure.AzureContainerRegistry import scala.util.{Failure, Success, Try} @@ -19,7 +20,9 @@ sealed trait DockerImageIdentifier { lazy val nameWithDefaultRepository = // In ACR, the repository is part of the registry domain instead of the path // e.g. `terrabatchdev.azurecr.io` - if (host.exists(_.contains(AzureContainerRegistry.domain))) + // In ECR, an image with no repository is supported + // e.g. 123456790.dkr.ecr.eu-west-2.amazonaws.com/example-tool + if (host.exists(_.contains(AzureContainerRegistry.domain)) || host.exists(isEcr)) image else repository.getOrElse("library") + s"/$image" diff --git a/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala b/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala index 3b3c0e5d7c5..e7bb5e7777b 100644 --- a/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala +++ b/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala @@ -14,6 +14,7 @@ import cromwell.core.actor.StreamIntegration.{BackPressure, StreamContext} import cromwell.core.{Dispatcher, DockerConfiguration} import cromwell.docker.DockerInfoActor._ import cromwell.docker.registryv2.DockerRegistryV2Abstract +import cromwell.docker.registryv2.flows.aws.AwsElasticContainerRegistry import cromwell.docker.registryv2.flows.azure.AzureContainerRegistry import cromwell.docker.registryv2.flows.dockerhub.DockerHubRegistry import cromwell.docker.registryv2.flows.google.GoogleRegistry @@ -239,6 +240,7 @@ object DockerInfoActor { // To add a new registry, simply add it to that list List( + ("aws", { c: DockerRegistryConfig => new AwsElasticContainerRegistry(c) }), ("azure", { c: DockerRegistryConfig => new AzureContainerRegistry(c) }), ("dockerhub", { c: DockerRegistryConfig => new DockerHubRegistry(c) }), ("google", { c: DockerRegistryConfig => new GoogleRegistry(c) }), diff --git a/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala b/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala index 234b762429d..bb1b717209f 100644 --- a/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala +++ b/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala @@ -9,6 +9,7 @@ import cromwell.docker._ import cromwell.docker.registryv2.DockerRegistryV2Abstract._ import io.circe.Decoder import io.circe.generic.auto._ +import org.apache.commons.codec.digest.DigestUtils import org.http4s.Uri.{Authority, Scheme} import org.http4s._ import org.http4s.circe._ @@ -156,6 +157,11 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi .handleErrorWith(tryOCIManifest) } + /** + * Gets the authorization scheme to use for the token request. + */ + protected def getAuthorizationScheme(dockerImageIdentifier: DockerImageIdentifier): AuthScheme = AuthScheme.Bearer + /** * Returns true if this flow is able to process this docker image, * false otherwise @@ -240,7 +246,7 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi manifestHeader: Accept ): IO[Request[IO]] = { val authorizationHeader: Option[Authorization] = - token.map(t => Authorization(Credentials.Token(AuthScheme.Bearer, t))) + token.map(t => Authorization(Credentials.Token(getAuthorizationScheme(imageId), t))) val request = Method.GET( buildManifestUri(imageId), List( @@ -316,7 +322,8 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi } private def getDigestFromResponse(response: Response[IO]): IO[DockerHashResult] = response match { - case Status.Successful(r) => extractDigestFromHeaders(r.headers) + case Status.Successful(r) => + extractDigestFromHeaders(r.headers).handleErrorWith(_ => calculateDigestFromBody(r.bodyText)) case Status.Unauthorized(r) => r.as[String].flatMap(body => IO.raiseError(new Unauthorized(r.status.toString + " " + body))) case Status.NotFound(r) => r.as[String].flatMap(body => IO.raiseError(new NotFound(r.status.toString + " " + body))) @@ -329,4 +336,10 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi case Some(digest) => IO.fromEither(DockerHashResult.fromString(digest.value).toEither) case None => IO.raiseError(new Exception(s"Manifest response did not have a digest header")) } + + private def calculateDigestFromBody(body: fs2.Stream[IO, String]): IO[DockerHashResult] = + body.compile.string + .map(s => "sha256:" + DigestUtils.sha256Hex(s)) + .map(DockerHashResult.fromString) + .flatMap(IO.fromTry) } diff --git a/dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/aws/AwsElasticContainerRegistry.scala b/dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/aws/AwsElasticContainerRegistry.scala new file mode 100644 index 00000000000..c2aeda60b33 --- /dev/null +++ b/dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/aws/AwsElasticContainerRegistry.scala @@ -0,0 +1,78 @@ +package cromwell.docker.registryv2.flows.aws + +import cats.effect.IO +import cromwell.docker.{DockerImageIdentifier, DockerInfoActor, DockerRegistryConfig} +import cromwell.docker.registryv2.DockerRegistryV2Abstract +import cromwell.docker.registryv2.flows.aws.AwsElasticContainerRegistry.{isEcr, isPublicEcr} +import org.http4s.{AuthScheme, Header} +import org.http4s.client.Client +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.ecr.EcrClient +import software.amazon.awssdk.services.ecrpublic.EcrPublicClient +import software.amazon.awssdk.services.ecrpublic.model.GetAuthorizationTokenRequest + +import scala.compat.java8.OptionConverters.RichOptionalGeneric + +class AwsElasticContainerRegistry(config: DockerRegistryConfig) extends DockerRegistryV2Abstract(config) { + + private lazy val ecrClient = EcrClient.create() + private lazy val ecrPublicClient = EcrPublicClient.builder().region(Region.US_EAST_1).build() + + override def getAuthorizationScheme(dockerImageIdentifier: DockerImageIdentifier): AuthScheme = + if (isPublicEcr(dockerImageIdentifier.hostAsString)) AuthScheme.Bearer else AuthScheme.Basic + + override def accepts(dockerImageIdentifier: DockerImageIdentifier): Boolean = + isEcr(dockerImageIdentifier.hostAsString) + + /** + * (e.g registry-1.docker.io) + */ + override def registryHostName(dockerImageIdentifier: DockerImageIdentifier): String = + dockerImageIdentifier.host.getOrElse("") + + /** + * (e.g auth.docker.io) + */ + override def authorizationServerHostName(dockerImageIdentifier: DockerImageIdentifier): String = + dockerImageIdentifier.host.getOrElse("") + + override def getToken( + dockerInfoContext: DockerInfoActor.DockerInfoContext + )(implicit client: Client[IO]): IO[Option[String]] = + if (isPublicEcr(dockerInfoContext.dockerImageID.hostAsString)) getPublicEcrToken + else getPrivateEcrToken + + /** + * Builds the list of headers for the token request + */ + override def buildTokenRequestHeaders(dockerInfoContext: DockerInfoActor.DockerInfoContext): List[Header] = + List.empty + + private def getPublicEcrToken: IO[Option[String]] = + IO( + Option( + ecrPublicClient + .getAuthorizationToken(GetAuthorizationTokenRequest.builder().build()) + .authorizationData() + .authorizationToken() + ) + ) + + private def getPrivateEcrToken: IO[Option[String]] = + IO( + ecrClient + .getAuthorizationToken() + .authorizationData() + .stream() + .findFirst() + .asScala + .map(_.authorizationToken()) + ) + +} + +object AwsElasticContainerRegistry { + def isEcr(host: String): Boolean = isPublicEcr(host) || isPrivateEcr(host) + def isPublicEcr(host: String): Boolean = host.contains("public.ecr.aws") + def isPrivateEcr(host: String): Boolean = host.contains("amazonaws.com") +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b1acd1e30aa..e68021d7dac 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -402,6 +402,8 @@ object Dependencies { "batch", "core", "cloudwatchlogs", + "ecr", + "ecrpublic", "s3", "sts", ).map(artifactName => "software.amazon.awssdk" % artifactName % awsSdkV)