-
Notifications
You must be signed in to change notification settings - Fork 15
Open
Description
Hi there,
I have been working on some flows that catch jobs that time out and noticed that some of my child jobs were stuck as 'Waiting' even though I'd set completion_checker_job.config.on_missing_references = OnMissing.NONE
. @utf and I have worked out that this is because the config isn't being passed on to the children of completion_checker_job
. I have written a minimum working example below:
from jobflow import Flow, Response, job, OnMissing
import time
@job
def double(a):
if a == 1:
time.sleep(10000) # Simulate some jobs taking too long to complete
else:
return a * 2
@job
def controller(a):
job = double(a)
completion_checker_job = completion_checker(job.output, a)
completion_checker_job.config.on_missing_references = OnMissing.NONE
flow = Flow(jobs=[job, completion_checker_job], output=completion_checker_job.output)
return Response(replace=flow)
@job
def completion_checker(output, a):
if output is None:
controller_job = controller(2)
return Response(replace=controller_job)
else:
return output
controller_job = controller(a)
This can be fixed by setting the config for all child jobs:
from jobflow import Flow, Response, job, OnMissing
import time
@job
def double(a):
if a == 1:
time.sleep(10000) # Simulate some jobs taking too long to complete
else:
return a * 2
@job
def controller(a):
job = double(a)
job.config.on_missing_references = OnMissing.NONE
completion_checker_job = completion_checker(job.output, a)
completion_checker_job.config.on_missing_references = OnMissing.NONE
flow = Flow(jobs=[job, completion_checker_job], output=completion_checker_job.output)
return Response(replace=flow)
@job
def completion_checker(output, a):
if output is None:
controller_job = controller(2)
controller_job.config.on_missing_references = OnMissing.NONE
return Response(replace=controller_job)
else:
return output
controller_job = controller(a)
Metadata
Metadata
Assignees
Labels
No labels