Skip to content

Commit 7d1384d

Browse files
committed
Use Papi or Batch log for retry with more memory.
1 parent f5b9632 commit 7d1384d

File tree

10 files changed

+180
-87
lines changed

10 files changed

+180
-87
lines changed

backend/src/main/scala/cromwell/backend/io/JobPaths.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ trait JobPaths {
8080
lazy val dockerCid = callExecutionRoot.resolve(dockerCidFilename)
8181
lazy val returnCode = callExecutionRoot.resolve(returnCodeFilename)
8282
lazy val memoryRetryRC = callExecutionRoot.resolve(memoryRetryRCFilename)
83+
// Path to to an existing file that contains the error text of the job if it failed due to memory constraints.
84+
lazy val memoryRetryError = Option(standardPaths.error)
8385

8486
// This is a `def` because `standardPaths` is a `var` that may be reassigned during the calculation of
8587
// standard output and error file names.

backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala

Lines changed: 93 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,7 +1416,7 @@ trait StandardAsyncExecutionActor
14161416
): Future[ExecutionHandle] = {
14171417

14181418
// Returns true if the task has written an RC file that indicates OOM, false otherwise
1419-
def memoryRetryRC: Future[Boolean] = {
1419+
def memoryRetryRC: Future[(Boolean, Option[Path])] = {
14201420

14211421
def readFile(path: Path, maxBytes: Option[Int]): Future[String] =
14221422
asyncIo.contentAsStringAsync(path, maxBytes, failOnOverflow = false)
@@ -1438,23 +1438,37 @@ trait StandardAsyncExecutionActor
14381438
}
14391439
}
14401440

1441-
def checkMemoryRetryStderr(errorKeys: List[String], maxBytes: Int): Future[Boolean] =
1442-
readFile(jobPaths.standardPaths.error, Option(maxBytes)) map { errorContent =>
1441+
def checkMemoryRetryStderr(memoryRetryError: Path, errorKeys: List[String], maxBytes: Int): Future[Boolean] =
1442+
readFile(memoryRetryError, Option(maxBytes)) map { errorContent =>
14431443
errorKeys.exists(errorContent.contains)
14441444
}
14451445

1446-
asyncIo.existsAsync(jobPaths.memoryRetryRC) flatMap {
1447-
case true => checkMemoryRetryRC()
1448-
case false =>
1449-
(memoryRetryErrorKeys, memoryRetryStderrLimit) match {
1450-
case (Some(keys), Some(limit)) =>
1451-
asyncIo.existsAsync(jobPaths.standardPaths.error) flatMap {
1452-
case true => checkMemoryRetryStderr(keys, limit)
1453-
case false => Future.successful(false)
1454-
}
1455-
case _ => Future.successful(false)
1456-
}
1457-
}
1446+
def checkMemoryRetryError(): Future[Boolean] =
1447+
(memoryRetryErrorKeys, memoryRetryStderrLimit, jobPaths.memoryRetryError) match {
1448+
case (Some(keys), Some(limit), Some(memoryRetryError)) =>
1449+
for {
1450+
memoryRetryErrorExists <- asyncIo.existsAsync(memoryRetryError)
1451+
memoryRetryErrorFound <-
1452+
if (memoryRetryErrorExists)
1453+
checkMemoryRetryStderr(memoryRetryError, keys, limit)
1454+
else
1455+
Future.successful(false)
1456+
} yield memoryRetryErrorFound
1457+
case _ => Future.successful(false)
1458+
}
1459+
1460+
// For backwards behavioral compatibility, check for the old memory retry RC file first. That file used to catch
1461+
// the errors from the standard error file, but now sometimes the error is written to a separate log file.
1462+
// If it exists, check its contents. If it doesn't find an OOM code, check the new memory retry error file.
1463+
for {
1464+
memoryRetryRCExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
1465+
memoryRetryRCErrorFound <- if (memoryRetryRCExists) checkMemoryRetryRC() else Future.successful(false)
1466+
memoryRetryErrorFound <- if (memoryRetryRCErrorFound) Future.successful(true) else checkMemoryRetryError()
1467+
memoryErrorPathOption =
1468+
if (memoryRetryRCErrorFound) Option(jobPaths.standardPaths.error)
1469+
else if (memoryRetryErrorFound) jobPaths.memoryRetryError
1470+
else None
1471+
} yield (memoryRetryErrorFound, memoryErrorPathOption)
14581472
}
14591473

14601474
val stderr = jobPaths.standardPaths.error
@@ -1465,74 +1479,76 @@ trait StandardAsyncExecutionActor
14651479
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
14661480
// may fail due to race conditions on quickly-executing jobs.
14671481
stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future.successful(0L)
1468-
outOfMemoryDetected <- memoryRetryRC
1469-
} yield (stderrSize, returnCodeAsString, outOfMemoryDetected)
1470-
1471-
stderrSizeAndReturnCodeAndMemoryRetry flatMap { case (stderrSize, returnCodeAsString, outOfMemoryDetected) =>
1472-
val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)
1473-
1474-
if (isDone(status)) {
1475-
tryReturnCodeAsInt match {
1476-
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
1477-
val executionHandle = Future.successful(
1478-
FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption),
1479-
Option(returnCodeAsInt),
1480-
None
1482+
(outOfMemoryDetected, outOfMemoryPathOption) <- memoryRetryRC
1483+
} yield (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption)
1484+
1485+
stderrSizeAndReturnCodeAndMemoryRetry flatMap {
1486+
case (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption) =>
1487+
val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)
1488+
1489+
if (isDone(status)) {
1490+
tryReturnCodeAsInt match {
1491+
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
1492+
val executionHandle = Future.successful(
1493+
FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption),
1494+
Option(returnCodeAsInt),
1495+
None
1496+
)
14811497
)
1482-
)
1483-
retryElseFail(executionHandle)
1484-
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
1485-
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
1486-
// It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
1487-
// if it was caused by OOM killer, want to handle as OOM and not job abort.
1488-
case Success(returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
1489-
val executionHandle = Future.successful(
1490-
FailedNonRetryableExecutionHandle(
1491-
RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log),
1492-
Option(returnCodeAsInt),
1493-
None
1498+
retryElseFail(executionHandle)
1499+
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
1500+
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
1501+
// It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
1502+
// if it was caused by OOM killer, want to handle as OOM and not job abort.
1503+
case Success(returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
1504+
val executionHandle = Future.successful(
1505+
FailedNonRetryableExecutionHandle(
1506+
RetryWithMoreMemory(jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
1507+
Option(returnCodeAsInt),
1508+
None
1509+
)
14941510
)
1495-
)
1496-
retryElseFail(executionHandle,
1497-
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
1498-
)
1499-
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
1500-
Future.successful(AbortedExecutionHandle)
1501-
case Success(returnCodeAsInt) =>
1502-
val executionHandle = Future.successful(
1503-
FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
1504-
Option(returnCodeAsInt),
1505-
None
1511+
retryElseFail(executionHandle,
1512+
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
15061513
)
1507-
)
1508-
retryElseFail(executionHandle)
1509-
case Failure(_) =>
1510-
Future.successful(
1511-
FailedNonRetryableExecutionHandle(
1512-
ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
1513-
kvPairsToSave = None
1514+
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
1515+
Future.successful(AbortedExecutionHandle)
1516+
case Success(returnCodeAsInt) =>
1517+
val executionHandle = Future.successful(
1518+
FailedNonRetryableExecutionHandle(
1519+
WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
1520+
Option(returnCodeAsInt),
1521+
None
1522+
)
15141523
)
1515-
)
1516-
}
1517-
} else {
1518-
tryReturnCodeAsInt match {
1519-
case Success(returnCodeAsInt)
1520-
if outOfMemoryDetected && memoryRetryRequested && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
1521-
val executionHandle = Future.successful(
1522-
FailedNonRetryableExecutionHandle(
1523-
RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log),
1524-
Option(returnCodeAsInt),
1525-
None
1524+
retryElseFail(executionHandle)
1525+
case Failure(_) =>
1526+
Future.successful(
1527+
FailedNonRetryableExecutionHandle(
1528+
ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
1529+
kvPairsToSave = None
1530+
)
15261531
)
1527-
)
1528-
retryElseFail(executionHandle,
1529-
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
1530-
)
1531-
case _ =>
1532-
val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
1533-
retryElseFail(failureStatus)
1532+
}
1533+
} else {
1534+
tryReturnCodeAsInt match {
1535+
case Success(returnCodeAsInt)
1536+
if outOfMemoryDetected && memoryRetryRequested && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
1537+
val executionHandle = Future.successful(
1538+
FailedNonRetryableExecutionHandle(
1539+
RetryWithMoreMemory(jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
1540+
Option(returnCodeAsInt),
1541+
None
1542+
)
1543+
)
1544+
retryElseFail(executionHandle,
1545+
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
1546+
)
1547+
case _ =>
1548+
val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
1549+
retryElseFail(failureStatus)
1550+
}
15341551
}
1535-
}
15361552
} recoverWith { case exception =>
15371553
if (isDone(status)) Future.successful(FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None))
15381554
else {

centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/retry_with_more_memory.wdl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
version 1.0
22

33
task imitate_oom_error {
4+
meta {
5+
volatile: true
6+
}
47
command {
58
echo "$MEM_SIZE $MEM_UNIT"
69

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ import cromwell.backend.google.batch.models.RunStatus.TerminalRunStatus
2727
import cromwell.backend.google.batch.models._
2828
import cromwell.backend.google.batch.monitoring.{BatchInstrumentation, CheckpointingConfiguration, MonitoringImage}
2929
import cromwell.backend.google.batch.runnable.WorkflowOptionKeys
30-
import cromwell.backend.google.batch.util.{GcpBatchReferenceFilesMappingOperations, RuntimeOutputMapping}
30+
import cromwell.backend.google.batch.util.{
31+
GcpBatchReferenceFilesMappingOperations,
32+
MemoryRetryRunnable,
33+
MemoryRetryStandard,
34+
RuntimeOutputMapping
35+
}
3136
import cromwell.backend.standard._
3237
import cromwell.core._
3338
import cromwell.core.io.IoCommandBuilder
@@ -633,7 +638,14 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
633638

634639
// if the `memory_retry_multiplier` is not present in the workflow options there is no need to check whether or
635640
// not the `stderr` file contained memory retry error keys
636-
val retryWithMoreMemoryKeys: Option[List[String]] = memoryRetryFactor.flatMap(_ => memoryRetryErrorKeys)
641+
// If the retry detection is processed using the stderr then do not try retry with more memory in the backend.
642+
// This keeps the backend jobs from logging the memory retry error keys in the very verbose Google Cloud Batch
643+
// logs, tripping up the standard memory retry detection.
644+
val retryWithMoreMemoryKeys: Option[List[String]] =
645+
batchConfiguration.batchAttributes.memoryRetryCheckMode match {
646+
case MemoryRetryRunnable => memoryRetryFactor.flatMap(_ => memoryRetryErrorKeys)
647+
case MemoryRetryStandard => None
648+
}
637649

638650
val targetLogFile = batchAttributes.logsPolicy match {
639651
case GcpBatchLogsPolicy.CloudLogging => None

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchConfigurationAttributes.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.{
2222
GcsTransferConfiguration,
2323
VirtualPrivateCloudConfiguration
2424
}
25-
import cromwell.backend.google.batch.util.GcpBatchReferenceFilesMappingOperations
25+
import cromwell.backend.google.batch.util.{GcpBatchReferenceFilesMappingOperations, MemoryRetryCheckMode}
2626
import cromwell.cloudsupport.gcp.GoogleConfiguration
2727
import cromwell.cloudsupport.gcp.auth.GoogleAuthMode
2828
import cromwell.docker.DockerMirroring
@@ -58,7 +58,8 @@ case class GcpBatchConfigurationAttributes(
5858
referenceFileToDiskImageMappingOpt: Option[Map[String, GcpBatchReferenceFilesDisk]],
5959
checkpointingInterval: FiniteDuration,
6060
logsPolicy: GcpBatchLogsPolicy,
61-
maxTransientErrorRetries: Int
61+
maxTransientErrorRetries: Int,
62+
memoryRetryCheckMode: MemoryRetryCheckMode
6263
)
6364

6465
object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOperations with StrictLogging {
@@ -89,6 +90,7 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
8990
val DefaultGcsTransferAttempts: Refined[Int, Positive] = refineMV[Positive](3)
9091

9192
val checkpointingIntervalKey = "checkpointing-interval"
93+
val memoryRetryCheckModeKey = "memory-retry-check-mode"
9294

9395
private val batchKeys = CommonBackendConfigurationAttributes.commonValidConfigurationAttributeKeys ++ Set(
9496
"project",
@@ -128,7 +130,8 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
128130
"virtual-private-cloud.subnetwork-label-key",
129131
"virtual-private-cloud.auth",
130132
"reference-disk-localization-manifests",
131-
checkpointingIntervalKey
133+
checkpointingIntervalKey,
134+
memoryRetryCheckModeKey
132135
)
133136

134137
private val deprecatedBatchKeys: Map[String, String] = Map(
@@ -311,6 +314,13 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
311314
val maxTransientErrorRetries: Int =
312315
backendConfig.as[Option[Int]]("max-transient-error-retries").getOrElse(10)
313316

317+
val memoryRetryCheckMode: ErrorOr[MemoryRetryCheckMode] =
318+
MemoryRetryCheckMode
319+
.tryParse(
320+
backendConfig.getOrElse(memoryRetryCheckModeKey, MemoryRetryCheckMode.DefaultMode.name)
321+
)
322+
.toErrorOr
323+
314324
def authGoogleConfigForBatchConfigurationAttributes(
315325
project: String,
316326
bucket: String,
@@ -327,7 +337,8 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
327337
virtualPrivateCloudConfiguration: VirtualPrivateCloudConfiguration,
328338
batchRequestTimeoutConfiguration: BatchRequestTimeoutConfiguration,
329339
referenceDiskLocalizationManifestFilesOpt: Option[List[ManifestFile]],
330-
logsPolicy: GcpBatchLogsPolicy
340+
logsPolicy: GcpBatchLogsPolicy,
341+
memoryRetryCheckMode: MemoryRetryCheckMode
331342
): ErrorOr[GcpBatchConfigurationAttributes] =
332343
(googleConfig.auth(batchName), googleConfig.auth(gcsName)) mapN { (batchAuth, gcsAuth) =>
333344
val generatedReferenceFilesMappingOpt = referenceDiskLocalizationManifestFilesOpt map {
@@ -354,7 +365,8 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
354365
referenceFileToDiskImageMappingOpt = generatedReferenceFilesMappingOpt,
355366
checkpointingInterval = checkpointingInterval,
356367
logsPolicy = logsPolicy,
357-
maxTransientErrorRetries = maxTransientErrorRetries
368+
maxTransientErrorRetries = maxTransientErrorRetries,
369+
memoryRetryCheckMode = memoryRetryCheckMode
358370
)
359371
}
360372

@@ -373,7 +385,8 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
373385
virtualPrivateCloudConfiguration,
374386
batchRequestTimeoutConfigurationValidation,
375387
referenceDiskLocalizationManifestFiles,
376-
logsPolicy
388+
logsPolicy,
389+
memoryRetryCheckMode
377390
) flatMapN authGoogleConfigForBatchConfigurationAttributes match {
378391
case Valid(r) => r
379392
case Invalid(f) =>

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchJobPaths.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cromwell.backend.google.batch.models
22

33
import cromwell.backend.BackendJobDescriptorKey
44
import cromwell.backend.google.batch.runnable.GcpBatchMetadataKeys
5+
import cromwell.backend.google.batch.util.{MemoryRetryRunnable, MemoryRetryStandard}
56
import cromwell.backend.io.JobPaths
67
import cromwell.core.path.Path
78
import cromwell.services.metadata.CallMetadataKeys
@@ -48,6 +49,12 @@ case class GcpBatchJobPaths(override val workflowPaths: GcpBatchWorkflowPaths,
4849
case _ => None
4950
}
5051

52+
override lazy val memoryRetryError: Option[Path] =
53+
workflowPaths.gcpBatchConfiguration.batchAttributes.memoryRetryCheckMode match {
54+
case MemoryRetryRunnable => None
55+
case MemoryRetryStandard => maybeBatchLogPath
56+
}
57+
5158
override lazy val customMetadataPaths = {
5259
val backendLogsMetadata = maybeBatchLogPath map { p: Path =>
5360
Map(CallMetadataKeys.BackendLogsPrefix + ":log" -> p)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package cromwell.backend.google.batch.util
2+
3+
import scala.util.{Failure, Success, Try}
4+
5+
sealed trait MemoryRetryCheckMode {
6+
val name: String
7+
8+
override def toString: String = name
9+
}
10+
11+
case object MemoryRetryRunnable extends MemoryRetryCheckMode {
12+
override val name: String = "Runnable"
13+
}
14+
15+
case object MemoryRetryStandard extends MemoryRetryCheckMode {
16+
override val name: String = "Standard"
17+
}
18+
19+
object MemoryRetryCheckMode {
20+
val DefaultMode = MemoryRetryRunnable
21+
private val AllModes: Seq[MemoryRetryCheckMode] = Seq(MemoryRetryRunnable, MemoryRetryStandard)
22+
23+
def tryParse(mode: String): Try[MemoryRetryCheckMode] =
24+
AllModes
25+
.find(_.name.equalsIgnoreCase(mode))
26+
.map(Success(_))
27+
.getOrElse(
28+
Failure(
29+
new Exception(
30+
s"Invalid memory retry check mode: '$mode', supported modes are: ${AllModes.map(_.name).mkString("'", "', '", "'")}"
31+
)
32+
)
33+
)
34+
}

0 commit comments

Comments
 (0)