@@ -1361,7 +1361,7 @@ trait StandardAsyncExecutionActor
1361
1361
): Future [ExecutionHandle ] = {
1362
1362
1363
1363
// Returns true if the task has written an RC file that indicates OOM, false otherwise
1364
- def memoryRetryRC : Future [Boolean ] = {
1364
+ def memoryRetryRC : Future [( Boolean , Option [ Path ]) ] = {
1365
1365
1366
1366
def readFile (path : Path , maxBytes : Option [Int ]): Future [String ] =
1367
1367
asyncIo.contentAsStringAsync(path, maxBytes, failOnOverflow = false )
@@ -1384,22 +1384,33 @@ trait StandardAsyncExecutionActor
1384
1384
}
1385
1385
1386
1386
def checkMemoryRetryStderr (errorKeys : List [String ], maxBytes : Int ): Future [Boolean ] =
1387
- readFile(jobPaths.standardPaths.error , Option (maxBytes)) map { errorContent =>
1387
+ readFile(jobPaths.memoryRetryError , Option (maxBytes)) map { errorContent =>
1388
1388
errorKeys.exists(errorContent.contains)
1389
1389
}
1390
1390
1391
- asyncIo.existsAsync(jobPaths.memoryRetryRC) flatMap {
1392
- case true => checkMemoryRetryRC()
1393
- case false =>
1394
- (memoryRetryErrorKeys, memoryRetryStderrLimit) match {
1395
- case (Some (keys), Some (limit)) =>
1396
- asyncIo.existsAsync(jobPaths.standardPaths.error) flatMap {
1397
- case true => checkMemoryRetryStderr(keys, limit)
1398
- case false => Future .successful(false )
1399
- }
1400
- case _ => Future .successful(false )
1401
- }
1402
- }
1391
+ def checkMemoryRetryError (): Future [Boolean ] =
1392
+ (memoryRetryErrorKeys, memoryRetryStderrLimit) match {
1393
+ case (Some (keys), Some (limit)) =>
1394
+ for {
1395
+ memoryRetryErrorExists <- asyncIo.existsAsync(jobPaths.memoryRetryError)
1396
+ memoryRetryErrorFound <-
1397
+ if (memoryRetryErrorExists) checkMemoryRetryStderr(keys, limit) else Future .successful(false )
1398
+ } yield memoryRetryErrorFound
1399
+ case _ => Future .successful(false )
1400
+ }
1401
+
1402
+ // For backwards behavioral compatibility, check for the old memory retry RC file first. That file used to catch
1403
+ // the errors from the standard error file, but now sometimes the error is written to a separate log file.
1404
+ // If it exists, check its contents. If it doesn't find an OOM code, check the new memory retry error file.
1405
+ for {
1406
+ memoryRetryRCExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
1407
+ memoryRetryRCErrorFound <- if (memoryRetryRCExists) checkMemoryRetryRC() else Future .successful(false )
1408
+ memoryRetryErrorFound <- if (memoryRetryRCErrorFound) Future .successful(true ) else checkMemoryRetryError()
1409
+ memoryErrorPathOption =
1410
+ if (memoryRetryRCErrorFound) Option (jobPaths.standardPaths.error)
1411
+ else if (memoryRetryErrorFound) Option (jobPaths.memoryRetryError)
1412
+ else None
1413
+ } yield (memoryRetryErrorFound, memoryErrorPathOption)
1403
1414
}
1404
1415
1405
1416
val stderr = jobPaths.standardPaths.error
@@ -1410,70 +1421,72 @@ trait StandardAsyncExecutionActor
1410
1421
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
1411
1422
// may fail due to race conditions on quickly-executing jobs.
1412
1423
stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future .successful(0L )
1413
- outOfMemoryDetected <- memoryRetryRC
1414
- } yield (stderrSize, returnCodeAsString, outOfMemoryDetected)
1415
-
1416
- stderrSizeAndReturnCodeAndMemoryRetry flatMap { case (stderrSize, returnCodeAsString, outOfMemoryDetected) =>
1417
- val tryReturnCodeAsInt = Try (returnCodeAsString.trim.toInt)
1418
-
1419
- if (isDone(status)) {
1420
- tryReturnCodeAsInt match {
1421
- case Success (returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
1422
- val executionHandle = Future .successful(
1423
- FailedNonRetryableExecutionHandle (StderrNonEmpty (jobDescriptor.key.tag, stderrSize, stderrAsOption),
1424
- Option (returnCodeAsInt),
1425
- None
1424
+ (outOfMemoryDetected, outOfMemoryPathOption) <- memoryRetryRC
1425
+ } yield (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption)
1426
+
1427
+ stderrSizeAndReturnCodeAndMemoryRetry flatMap {
1428
+ case (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption) =>
1429
+ val tryReturnCodeAsInt = Try (returnCodeAsString.trim.toInt)
1430
+
1431
+ if (isDone(status)) {
1432
+ tryReturnCodeAsInt match {
1433
+ case Success (returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
1434
+ val executionHandle = Future .successful(
1435
+ FailedNonRetryableExecutionHandle (StderrNonEmpty (jobDescriptor.key.tag, stderrSize, stderrAsOption),
1436
+ Option (returnCodeAsInt),
1437
+ None
1438
+ )
1426
1439
)
1427
- )
1428
- retryElseFail(executionHandle)
1429
- case Success (returnCodeAsInt) if continueOnReturnCode.continueFor( returnCodeAsInt) =>
1430
- handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
1431
- // It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
1432
- // if it was caused by OOM killer, want to handle as OOM and not job abort.
1433
- case Success (returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
1434
- val executionHandle = Future .successful (
1435
- FailedNonRetryableExecutionHandle (
1436
- RetryWithMoreMemory (jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log ),
1437
- Option (returnCodeAsInt),
1438
- None
1440
+ retryElseFail(executionHandle )
1441
+ case Success (returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
1442
+ handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
1443
+ // It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
1444
+ // if it was caused by OOM killer, want to handle as OOM and not job abort.
1445
+ case Success (returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
1446
+ val executionHandle = Future .successful(
1447
+ FailedNonRetryableExecutionHandle (
1448
+ RetryWithMoreMemory (jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
1449
+ Option (returnCodeAsInt ),
1450
+ None
1451
+ )
1439
1452
)
1440
- )
1441
- retryElseFail(executionHandle, outOfMemoryDetected)
1442
- case Success (returnCodeAsInt) if isAbort(returnCodeAsInt) =>
1443
- Future .successful(AbortedExecutionHandle )
1444
- case Success (returnCodeAsInt) =>
1445
- val executionHandle = Future .successful(
1446
- FailedNonRetryableExecutionHandle (WrongReturnCode (jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
1447
- Option (returnCodeAsInt),
1448
- None
1453
+ retryElseFail(executionHandle, outOfMemoryDetected)
1454
+ case Success (returnCodeAsInt) if isAbort(returnCodeAsInt) =>
1455
+ Future .successful(AbortedExecutionHandle )
1456
+ case Success (returnCodeAsInt) =>
1457
+ val executionHandle = Future .successful(
1458
+ FailedNonRetryableExecutionHandle (
1459
+ WrongReturnCode (jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
1460
+ Option (returnCodeAsInt),
1461
+ None
1462
+ )
1449
1463
)
1450
- )
1451
- retryElseFail(executionHandle)
1452
- case Failure (_) =>
1453
- Future .successful (
1454
- FailedNonRetryableExecutionHandle (
1455
- ReturnCodeIsNotAnInt (jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
1456
- kvPairsToSave = None
1464
+ retryElseFail(executionHandle )
1465
+ case Failure (_) =>
1466
+ Future .successful(
1467
+ FailedNonRetryableExecutionHandle (
1468
+ ReturnCodeIsNotAnInt (jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
1469
+ kvPairsToSave = None
1470
+ )
1457
1471
)
1458
- )
1459
- }
1460
- } else {
1461
- tryReturnCodeAsInt match {
1462
- case Success (returnCodeAsInt)
1463
- if outOfMemoryDetected && memoryRetryRequested && ! continueOnReturnCode.continueFor(returnCodeAsInt) =>
1464
- val executionHandle = Future .successful (
1465
- FailedNonRetryableExecutionHandle (
1466
- RetryWithMoreMemory (jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log ),
1467
- Option (returnCodeAsInt),
1468
- None
1472
+ }
1473
+ } else {
1474
+ tryReturnCodeAsInt match {
1475
+ case Success (returnCodeAsInt)
1476
+ if outOfMemoryDetected && memoryRetryRequested && ! continueOnReturnCode.continueFor (returnCodeAsInt) =>
1477
+ val executionHandle = Future .successful(
1478
+ FailedNonRetryableExecutionHandle (
1479
+ RetryWithMoreMemory (jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
1480
+ Option (returnCodeAsInt ),
1481
+ None
1482
+ )
1469
1483
)
1470
- )
1471
- retryElseFail(executionHandle, outOfMemoryDetected)
1472
- case _ =>
1473
- val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption )
1474
- retryElseFail(failureStatus)
1484
+ retryElseFail(executionHandle, outOfMemoryDetected )
1485
+ case _ =>
1486
+ val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
1487
+ retryElseFail(failureStatus )
1488
+ }
1475
1489
}
1476
- }
1477
1490
} recoverWith { case exception =>
1478
1491
if (isDone(status)) Future .successful(FailedNonRetryableExecutionHandle (exception, kvPairsToSave = None ))
1479
1492
else {
0 commit comments