Skip to content

Commit 18f7de1

Browse files
jorgeepditommasobentsherman
authored
Fix Google Batch hang when internal error during scheduling (#5567)
Signed-off-by: jorgee <jorge.ejarque@seqera.io> Signed-off-by: Jorge Ejarque <jorgee@users.noreply.github.com> Signed-off-by: Ben Sherman <bentshermann@gmail.com> Co-authored-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com> Co-authored-by: Ben Sherman <bentshermann@gmail.com>
1 parent 3d71468 commit 18f7de1

File tree

3 files changed

+75
-8
lines changed

3 files changed

+75
-8
lines changed

plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ import nextflow.trace.TraceRecord
6060
@CompileStatic
6161
class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
6262

63-
private static Pattern EXIT_CODE_REGEX = ~/exit code 500(\d\d)/
63+
private static final Pattern EXIT_CODE_REGEX = ~/exit code 500(\d\d)/
64+
65+
private static final Pattern BATCH_ERROR_REGEX = ~/Batch Error: code/
6466

6567
private GoogleBatchExecutor executor
6668

@@ -98,6 +100,11 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
98100

99101
private volatile long timestamp
100102

103+
/**
104+
* A flag to indicate that the job has failed without launching any tasks
105+
*/
106+
private volatile boolean noTaskJobfailure
107+
101108
GoogleBatchTaskHandler(TaskRun task, GoogleBatchExecutor executor) {
102109
super(task)
103110
this.client = executor.getClient()
@@ -445,9 +452,10 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
445452
*/
446453
protected String getTaskState() {
447454
final tasks = client.listTasks(jobId)
448-
if( !tasks.iterator().hasNext() )
449-
return 'PENDING'
450-
455+
if( !tasks.iterator().hasNext() ) {
456+
// if there are no tasks checks the job status
457+
return checkJobStatus()
458+
}
451459
final now = System.currentTimeMillis()
452460
final delta = now - timestamp;
453461
if( !taskState || delta >= 1_000) {
@@ -468,6 +476,21 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
468476
return taskState
469477
}
470478

479+
protected String checkJobStatus() {
480+
final jobStatus = client.getJobStatus(jobId)
481+
final newState = jobStatus?.state as String
482+
if (newState) {
483+
taskState = newState
484+
timestamp = System.currentTimeMillis()
485+
if (newState == "FAILED") {
486+
noTaskJobfailure = true
487+
}
488+
return taskState
489+
} else {
490+
return "PENDING"
491+
}
492+
}
493+
471494
static private final List<String> RUNNING_OR_COMPLETED = ['RUNNING', 'SUCCEEDED', 'FAILED']
472495

473496
static private final List<String> COMPLETED = ['SUCCEEDED', 'FAILED']
@@ -510,13 +533,14 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
510533

511534
protected Throwable getJobError() {
512535
try {
513-
final status = client.getTaskStatus(jobId, taskId)
514-
final eventsCount = status.getStatusEventsCount()
515-
final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null
536+
final events = noTaskJobfailure
537+
? client.getJobStatus(jobId).getStatusEventsList()
538+
: client.getTaskStatus(jobId, taskId).getStatusEventsList()
539+
final lastEvent = events?.get(events.size() - 1)
516540
log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - last event: ${lastEvent}; exit code: ${lastEvent?.taskExecution?.exitCode}"
517541

518542
final error = lastEvent?.description
519-
if( error && EXIT_CODE_REGEX.matcher(error).find() ) {
543+
if( error && (EXIT_CODE_REGEX.matcher(error).find() || BATCH_ERROR_REGEX.matcher(error).find()) ) {
520544
return new ProcessException(error)
521545
}
522546
}

plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchClient.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import com.google.cloud.batch.v1.BatchServiceClient
2929
import com.google.cloud.batch.v1.BatchServiceSettings
3030
import com.google.cloud.batch.v1.Job
3131
import com.google.cloud.batch.v1.JobName
32+
import com.google.cloud.batch.v1.JobStatus
3233
import com.google.cloud.batch.v1.LocationName
3334
import com.google.cloud.batch.v1.Task
3435
import com.google.cloud.batch.v1.TaskGroupName
@@ -123,6 +124,10 @@ class BatchClient {
123124
return describeTask(jobId, taskId).getStatus()
124125
}
125126

127+
JobStatus getJobStatus(String jobId) {
128+
return describeJob(jobId).getStatus()
129+
}
130+
126131
String getTaskState(String jobId, String taskId) {
127132
final status = getTaskStatus(jobId, taskId)
128133
return status ? status.getState().toString() : null

plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package nextflow.cloud.google.batch
1919

20+
import com.google.cloud.batch.v1.JobStatus
21+
import com.google.cloud.batch.v1.Task
22+
2023
import java.nio.file.Path
2124

2225
import com.google.cloud.batch.v1.GCS
@@ -581,4 +584,39 @@ class GoogleBatchTaskHandlerTest extends Specification {
581584
and:
582585
0 * client.deleteJob('job1') >> null
583586
}
587+
588+
JobStatus makeJobStatus(JobStatus.State state, String desc = null) {
589+
final builder = JobStatus.newBuilder().setState(state)
590+
if( desc ) {
591+
builder.addStatusEvents(
592+
StatusEvent.newBuilder()
593+
.setDescription(desc)
594+
)
595+
}
596+
builder.build()
597+
}
598+
599+
def 'should check job status when no tasks in job '() {
600+
601+
given:
602+
def jobId = 'job-id'
603+
def taskId = 'task-id'
604+
def client = Mock(BatchClient)
605+
def task = Mock(TaskRun) {
606+
lazyName() >> 'foo (1)'
607+
}
608+
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task))
609+
final message = 'Job failed when Batch tries to schedule it: Batch Error: code - CODE_MACHINE_TYPE_NOT_FOUND'
610+
when:
611+
client.listTasks(jobId) >>> [new LinkedList<Task>(), new LinkedList<Task>()]
612+
client.getJobStatus(jobId) >>> [
613+
null,
614+
makeJobStatus(JobStatus.State.FAILED, 'Scheduling Failed'),
615+
makeJobStatus(JobStatus.State.FAILED, message)
616+
]
617+
then:
618+
handler.getTaskState() == "PENDING"
619+
handler.getTaskState() == "FAILED"
620+
handler.getJobError().message == message
621+
}
584622
}

0 commit comments

Comments
 (0)