Skip to content

on_error callbacks don't fire in task chains #20

@kriberg

Description

@kriberg

I'm having some issues with triggering of the on_error task callbacks from chains. I've created a test project here:
https://github.com/kriberg/dyrygent-test

This defines three tasks:

@app.task
def normal_task():
    log.info("normal task")
    time.sleep(2)


@app.task(throws=(Exception,))
def failing_task():
    log.info("failing task")
    time.sleep(2)
    raise Exception("failure")


@app.task
def callback(msg, *args, **kwargs):
    log.error(f"error called: {msg} {args} {kwargs}")

These are put into a chain:

chain1 = chain(normal_task.si(), normal_task.si(), failing_task.si())
chain1.on_error(callback.si(f"Leaf chain 1 failed"))

Calling this with celery-dyrygent:

wf = Workflow()
wf.set_retry_policy("random", 1, 3)
wf.add_celery_canvas(chain1)
result = wf.apply_async(options={"link_error": callback.si("wf error")})

This produces the following log:

[2021-11-08 12:55:24,163: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:24,168: INFO/ForkPoolWorker-8] Scheduling execution of task 15b38035-8e7b-4857-be5c-9c6e44f4f438 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:24,180: INFO/ForkPoolWorker-8] Tick done, took 0.015532ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:24,180: INFO/MainProcess] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] received
[2021-11-08 12:55:24,181: INFO/ForkPoolWorker-1] normal task
[2021-11-08 12:55:24,183: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:24,184: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:26,197: INFO/ForkPoolWorker-1] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] succeeded in 2.0154468250002537s: None
[2021-11-08 12:55:26,201: INFO/ForkPoolWorker-8] Task 15b38035-8e7b-4857-be5c-9c6e44f4f438 is done, success
[2021-11-08 12:55:26,204: INFO/ForkPoolWorker-8] Scheduling execution of task 504684c4-257e-495f-8419-237c7442d954 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:26,206: INFO/ForkPoolWorker-8] Tick done, took 0.005649ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:26,207: INFO/MainProcess] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] received
[2021-11-08 12:55:26,208: INFO/ForkPoolWorker-1] normal task
[2021-11-08 12:55:26,211: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:26,212: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:28,212: INFO/ForkPoolWorker-1] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] succeeded in 2.004066970999702s: None
[2021-11-08 12:55:28,217: INFO/ForkPoolWorker-8] Task 504684c4-257e-495f-8419-237c7442d954 is done, success
[2021-11-08 12:55:28,220: INFO/ForkPoolWorker-8] Scheduling execution of task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:28,222: INFO/ForkPoolWorker-8] Tick done, took 0.006836ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:28,224: INFO/MainProcess] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] received
[2021-11-08 12:55:28,225: INFO/ForkPoolWorker-1] failing task
[2021-11-08 12:55:28,227: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:28,228: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:30,230: ERROR/ForkPoolWorker-1] error called: wf error () {}
[2021-11-08 12:55:30,230: INFO/ForkPoolWorker-1] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] raised expected: Exception('failure')
[2021-11-08 12:55:30,233: INFO/ForkPoolWorker-8] Tick done, took 0.000878ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:30,237: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:30,238: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 1s
[2021-11-08 12:55:32,425: INFO/ForkPoolWorker-8] Tick done, took 0.001262ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:32,431: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:32,433: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:34,458: INFO/ForkPoolWorker-8] Tick done, took 0.000845ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:34,462: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:34,463: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 1s
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 has final state after 4 checks
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 is done, failure, result '<class 'Exception'>(failure)'
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Tick done, took 0.000660ms, workflow finished after 7 ticks
[2021-11-08 12:55:36,430: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] succeeded in 0.0014809360000072047s: None

Here we see the callback linked to the overall workflow triggers as intended, but the callback set to the chain never fires.

Calling the same chain with celery apply_async:

chain1.on_error(callback.si("master error"))
chain1.apply_async()

Produces this:

[2021-11-08 12:55:46,447: INFO/MainProcess] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] received
[2021-11-08 12:55:46,449: INFO/ForkPoolWorker-8] normal task
[2021-11-08 12:55:48,455: INFO/MainProcess] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] received
[2021-11-08 12:55:48,455: INFO/ForkPoolWorker-8] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] succeeded in 2.0059917730031884s: None
[2021-11-08 12:55:48,456: INFO/ForkPoolWorker-8] normal task
[2021-11-08 12:55:50,462: INFO/ForkPoolWorker-8] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] succeeded in 2.0055490619997727s: None
[2021-11-08 12:55:50,463: INFO/MainProcess] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] received
[2021-11-08 12:55:50,465: INFO/ForkPoolWorker-8] failing task
[2021-11-08 12:55:52,470: ERROR/ForkPoolWorker-8] error called: Leaf chain 1 failed () {}
[2021-11-08 12:55:52,471: ERROR/ForkPoolWorker-8] error called: master error () {}
[2021-11-08 12:55:52,471: INFO/ForkPoolWorker-8] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] raised expected: Exception('failure')

Here both callbacks are triggered correctly.
Now, as we know, celery doesn't do well with a large complex canvas, so just using celery isn't a good option.

Is this a limitation with dyrygent or is it a bug?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions