-
Notifications
You must be signed in to change notification settings - Fork 16
Open
Description
I'm having some issues with triggering of the on_error task callbacks from chains. I've created a test project here:
https://github.com/kriberg/dyrygent-test
This defines three tasks:
@app.task
def normal_task():
log.info("normal task")
time.sleep(2)
@app.task(throws=(Exception,))
def failing_task():
log.info("failing task")
time.sleep(2)
raise Exception("failure")
@app.task
def callback(msg, *args, **kwargs):
log.error(f"error called: {msg} {args} {kwargs}")
These are put into a chain:
chain1 = chain(normal_task.si(), normal_task.si(), failing_task.si())
chain1.on_error(callback.si(f"Leaf chain 1 failed"))
Calling this with celery-dyrygent:
wf = Workflow()
wf.set_retry_policy("random", 1, 3)
wf.add_celery_canvas(chain1)
result = wf.apply_async(options={"link_error": callback.si("wf error")})
This produces the following log:
[2021-11-08 12:55:24,163: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:24,168: INFO/ForkPoolWorker-8] Scheduling execution of task 15b38035-8e7b-4857-be5c-9c6e44f4f438 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:24,180: INFO/ForkPoolWorker-8] Tick done, took 0.015532ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:24,180: INFO/MainProcess] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] received
[2021-11-08 12:55:24,181: INFO/ForkPoolWorker-1] normal task
[2021-11-08 12:55:24,183: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:24,184: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:26,197: INFO/ForkPoolWorker-1] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] succeeded in 2.0154468250002537s: None
[2021-11-08 12:55:26,201: INFO/ForkPoolWorker-8] Task 15b38035-8e7b-4857-be5c-9c6e44f4f438 is done, success
[2021-11-08 12:55:26,204: INFO/ForkPoolWorker-8] Scheduling execution of task 504684c4-257e-495f-8419-237c7442d954 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:26,206: INFO/ForkPoolWorker-8] Tick done, took 0.005649ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:26,207: INFO/MainProcess] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] received
[2021-11-08 12:55:26,208: INFO/ForkPoolWorker-1] normal task
[2021-11-08 12:55:26,211: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:26,212: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:28,212: INFO/ForkPoolWorker-1] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] succeeded in 2.004066970999702s: None
[2021-11-08 12:55:28,217: INFO/ForkPoolWorker-8] Task 504684c4-257e-495f-8419-237c7442d954 is done, success
[2021-11-08 12:55:28,220: INFO/ForkPoolWorker-8] Scheduling execution of task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:28,222: INFO/ForkPoolWorker-8] Tick done, took 0.006836ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:28,224: INFO/MainProcess] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] received
[2021-11-08 12:55:28,225: INFO/ForkPoolWorker-1] failing task
[2021-11-08 12:55:28,227: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:28,228: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:30,230: ERROR/ForkPoolWorker-1] error called: wf error () {}
[2021-11-08 12:55:30,230: INFO/ForkPoolWorker-1] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] raised expected: Exception('failure')
[2021-11-08 12:55:30,233: INFO/ForkPoolWorker-8] Tick done, took 0.000878ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:30,237: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:30,238: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 1s
[2021-11-08 12:55:32,425: INFO/ForkPoolWorker-8] Tick done, took 0.001262ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:32,431: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:32,433: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:34,458: INFO/ForkPoolWorker-8] Tick done, took 0.000845ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:34,462: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:34,463: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 1s
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 has final state after 4 checks
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 is done, failure, result '<class 'Exception'>(failure)'
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Tick done, took 0.000660ms, workflow finished after 7 ticks
[2021-11-08 12:55:36,430: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] succeeded in 0.0014809360000072047s: None
Here we see the callback linked to the overall workflow triggers as intended, but the callback set to the chain never fires.
Calling the same chain with celery apply_async:
chain1.on_error(callback.si("master error"))
chain1.apply_async()
Produces this:
[2021-11-08 12:55:46,447: INFO/MainProcess] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] received
[2021-11-08 12:55:46,449: INFO/ForkPoolWorker-8] normal task
[2021-11-08 12:55:48,455: INFO/MainProcess] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] received
[2021-11-08 12:55:48,455: INFO/ForkPoolWorker-8] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] succeeded in 2.0059917730031884s: None
[2021-11-08 12:55:48,456: INFO/ForkPoolWorker-8] normal task
[2021-11-08 12:55:50,462: INFO/ForkPoolWorker-8] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] succeeded in 2.0055490619997727s: None
[2021-11-08 12:55:50,463: INFO/MainProcess] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] received
[2021-11-08 12:55:50,465: INFO/ForkPoolWorker-8] failing task
[2021-11-08 12:55:52,470: ERROR/ForkPoolWorker-8] error called: Leaf chain 1 failed () {}
[2021-11-08 12:55:52,471: ERROR/ForkPoolWorker-8] error called: master error () {}
[2021-11-08 12:55:52,471: INFO/ForkPoolWorker-8] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] raised expected: Exception('failure')
Here both callbacks are triggered correctly.
Now, as we know, celery doesn't do well with a large complex canvas, so just using celery isn't a good option.
Is this a limitation with dyrygent or is it a bug?
reemaparakh
Metadata
Metadata
Assignees
Labels
No labels