@@ -1416,7 +1416,7 @@ trait StandardAsyncExecutionActor
1416
1416
): Future [ExecutionHandle ] = {
1417
1417
1418
1418
// Returns true if the task has written an RC file that indicates OOM, false otherwise
1419
- def memoryRetryRC : Future [Boolean ] = {
1419
+ def memoryRetryRC : Future [( Boolean , Option [ Path ]) ] = {
1420
1420
1421
1421
def readFile (path : Path , maxBytes : Option [Int ]): Future [String ] =
1422
1422
asyncIo.contentAsStringAsync(path, maxBytes, failOnOverflow = false )
@@ -1439,22 +1439,33 @@ trait StandardAsyncExecutionActor
1439
1439
}
1440
1440
1441
1441
def checkMemoryRetryStderr (errorKeys : List [String ], maxBytes : Int ): Future [Boolean ] =
1442
- readFile(jobPaths.standardPaths.error , Option (maxBytes)) map { errorContent =>
1442
+ readFile(jobPaths.memoryRetryError , Option (maxBytes)) map { errorContent =>
1443
1443
errorKeys.exists(errorContent.contains)
1444
1444
}
1445
1445
1446
- asyncIo.existsAsync(jobPaths.memoryRetryRC) flatMap {
1447
- case true => checkMemoryRetryRC()
1448
- case false =>
1449
- (memoryRetryErrorKeys, memoryRetryStderrLimit) match {
1450
- case (Some (keys), Some (limit)) =>
1451
- asyncIo.existsAsync(jobPaths.standardPaths.error) flatMap {
1452
- case true => checkMemoryRetryStderr(keys, limit)
1453
- case false => Future .successful(false )
1454
- }
1455
- case _ => Future .successful(false )
1456
- }
1457
- }
1446
+ def checkMemoryRetryError (): Future [Boolean ] =
1447
+ (memoryRetryErrorKeys, memoryRetryStderrLimit) match {
1448
+ case (Some (keys), Some (limit)) =>
1449
+ for {
1450
+ memoryRetryErrorExists <- asyncIo.existsAsync(jobPaths.memoryRetryError)
1451
+ memoryRetryErrorFound <-
1452
+ if (memoryRetryErrorExists) checkMemoryRetryStderr(keys, limit) else Future .successful(false )
1453
+ } yield memoryRetryErrorFound
1454
+ case _ => Future .successful(false )
1455
+ }
1456
+
1457
+ // For backwards behavioral compatibility, check for the old memory retry RC file first. That file used to catch
1458
+ // the errors from the standard error file, but now sometimes the error is written to a separate log file.
1459
+ // If it exists, check its contents. If it doesn't find an OOM code, check the new memory retry error file.
1460
+ for {
1461
+ memoryRetryRCExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
1462
+ memoryRetryRCErrorFound <- if (memoryRetryRCExists) checkMemoryRetryRC() else Future .successful(false )
1463
+ memoryRetryErrorFound <- if (memoryRetryRCErrorFound) Future .successful(true ) else checkMemoryRetryError()
1464
+ memoryErrorPathOption =
1465
+ if (memoryRetryRCErrorFound) Option (jobPaths.standardPaths.error)
1466
+ else if (memoryRetryErrorFound) Option (jobPaths.memoryRetryError)
1467
+ else None
1468
+ } yield (memoryRetryErrorFound, memoryErrorPathOption)
1458
1469
}
1459
1470
1460
1471
val stderr = jobPaths.standardPaths.error
@@ -1465,74 +1476,76 @@ trait StandardAsyncExecutionActor
1465
1476
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
1466
1477
// may fail due to race conditions on quickly-executing jobs.
1467
1478
stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future .successful(0L )
1468
- outOfMemoryDetected <- memoryRetryRC
1469
- } yield (stderrSize, returnCodeAsString, outOfMemoryDetected)
1470
-
1471
- stderrSizeAndReturnCodeAndMemoryRetry flatMap { case (stderrSize, returnCodeAsString, outOfMemoryDetected) =>
1472
- val tryReturnCodeAsInt = Try (returnCodeAsString.trim.toInt)
1473
-
1474
- if (isDone(status)) {
1475
- tryReturnCodeAsInt match {
1476
- case Success (returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
1477
- val executionHandle = Future .successful(
1478
- FailedNonRetryableExecutionHandle (StderrNonEmpty (jobDescriptor.key.tag, stderrSize, stderrAsOption),
1479
- Option (returnCodeAsInt),
1480
- None
1479
+ (outOfMemoryDetected, outOfMemoryPathOption) <- memoryRetryRC
1480
+ } yield (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption)
1481
+
1482
+ stderrSizeAndReturnCodeAndMemoryRetry flatMap {
1483
+ case (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption) =>
1484
+ val tryReturnCodeAsInt = Try (returnCodeAsString.trim.toInt)
1485
+
1486
+ if (isDone(status)) {
1487
+ tryReturnCodeAsInt match {
1488
+ case Success (returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
1489
+ val executionHandle = Future .successful(
1490
+ FailedNonRetryableExecutionHandle (StderrNonEmpty (jobDescriptor.key.tag, stderrSize, stderrAsOption),
1491
+ Option (returnCodeAsInt),
1492
+ None
1493
+ )
1481
1494
)
1482
- )
1483
- retryElseFail(executionHandle)
1484
- case Success (returnCodeAsInt) if continueOnReturnCode.continueFor( returnCodeAsInt) =>
1485
- handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
1486
- // It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
1487
- // if it was caused by OOM killer, want to handle as OOM and not job abort.
1488
- case Success (returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
1489
- val executionHandle = Future .successful (
1490
- FailedNonRetryableExecutionHandle (
1491
- RetryWithMoreMemory (jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log ),
1492
- Option (returnCodeAsInt),
1493
- None
1495
+ retryElseFail(executionHandle )
1496
+ case Success (returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
1497
+ handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
1498
+ // It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
1499
+ // if it was caused by OOM killer, want to handle as OOM and not job abort.
1500
+ case Success (returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
1501
+ val executionHandle = Future .successful(
1502
+ FailedNonRetryableExecutionHandle (
1503
+ RetryWithMoreMemory (jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
1504
+ Option (returnCodeAsInt ),
1505
+ None
1506
+ )
1494
1507
)
1495
- )
1496
- retryElseFail(executionHandle,
1497
- MemoryRetryResult (outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
1498
- )
1499
- case Success (returnCodeAsInt) if isAbort(returnCodeAsInt) =>
1500
- Future .successful(AbortedExecutionHandle )
1501
- case Success (returnCodeAsInt) =>
1502
- val executionHandle = Future .successful(
1503
- FailedNonRetryableExecutionHandle (WrongReturnCode (jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
1504
- Option (returnCodeAsInt),
1505
- None
1508
+ retryElseFail(executionHandle,
1509
+ MemoryRetryResult (outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
1506
1510
)
1507
- )
1508
- retryElseFail(executionHandle)
1509
- case Failure (_) =>
1510
- Future .successful(
1511
- FailedNonRetryableExecutionHandle (
1512
- ReturnCodeIsNotAnInt (jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
1513
- kvPairsToSave = None
1511
+ case Success (returnCodeAsInt) if isAbort(returnCodeAsInt) =>
1512
+ Future .successful(AbortedExecutionHandle )
1513
+ case Success (returnCodeAsInt) =>
1514
+ val executionHandle = Future .successful(
1515
+ FailedNonRetryableExecutionHandle (
1516
+ WrongReturnCode (jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
1517
+ Option (returnCodeAsInt),
1518
+ None
1519
+ )
1514
1520
)
1515
- )
1516
- }
1517
- } else {
1518
- tryReturnCodeAsInt match {
1519
- case Success (returnCodeAsInt)
1520
- if outOfMemoryDetected && memoryRetryRequested && ! continueOnReturnCode.continueFor(returnCodeAsInt) =>
1521
- val executionHandle = Future .successful(
1522
- FailedNonRetryableExecutionHandle (
1523
- RetryWithMoreMemory (jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log),
1524
- Option (returnCodeAsInt),
1525
- None
1521
+ retryElseFail(executionHandle)
1522
+ case Failure (_) =>
1523
+ Future .successful(
1524
+ FailedNonRetryableExecutionHandle (
1525
+ ReturnCodeIsNotAnInt (jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
1526
+ kvPairsToSave = None
1527
+ )
1526
1528
)
1527
- )
1528
- retryElseFail(executionHandle,
1529
- MemoryRetryResult (outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
1530
- )
1531
- case _ =>
1532
- val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
1533
- retryElseFail(failureStatus)
1529
+ }
1530
+ } else {
1531
+ tryReturnCodeAsInt match {
1532
+ case Success (returnCodeAsInt)
1533
+ if outOfMemoryDetected && memoryRetryRequested && ! continueOnReturnCode.continueFor(returnCodeAsInt) =>
1534
+ val executionHandle = Future .successful(
1535
+ FailedNonRetryableExecutionHandle (
1536
+ RetryWithMoreMemory (jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
1537
+ Option (returnCodeAsInt),
1538
+ None
1539
+ )
1540
+ )
1541
+ retryElseFail(executionHandle,
1542
+ MemoryRetryResult (outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
1543
+ )
1544
+ case _ =>
1545
+ val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
1546
+ retryElseFail(failureStatus)
1547
+ }
1534
1548
}
1535
- }
1536
1549
} recoverWith { case exception =>
1537
1550
if (isDone(status)) Future .successful(FailedNonRetryableExecutionHandle (exception, kvPairsToSave = None ))
1538
1551
else {
0 commit comments