Skip to content

Commit bd8aab2

Browse files
jedcunninghamCloud Composer Team
authored and
Cloud Composer Team
committed
Serialize _is_setup and _is_teardown on tasks (#30009)
We should make sure these attrs are in the serialized DAG so the scheduler knows if the tasks are setup or teardown tasks. GitOrigin-RevId: c7149c002b14606b3118e97eb676c3e4b6ff98b4
1 parent 269b16a commit bd8aab2

File tree

2 files changed

+32
-0
lines changed

2 files changed

+32
-0
lines changed

airflow/models/baseoperator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,9 @@ class derived from this one results in the creation of a task object,
686686
# Set to True for an operator instantiated by a mapped operator.
687687
__from_mapped = False
688688

689+
_is_setup = False
690+
_is_teardown = False
691+
689692
def __init__(
690693
self,
691694
task_id: str,
@@ -1472,6 +1475,8 @@ def get_serialized_fields(cls):
14721475
"template_fields",
14731476
"template_fields_renderers",
14741477
"params",
1478+
"_is_setup",
1479+
"_is_teardown",
14751480
}
14761481
)
14771482
DagContext.pop_context_managed_dag()

tests/serialization/test_dag_serialization.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i
159159
"_task_type": "BashOperator",
160160
"_task_module": "airflow.operators.bash",
161161
"pool": "default_pool",
162+
"_is_setup": False,
163+
"_is_teardown": False,
162164
"executor_config": {
163165
"__type": "dict",
164166
"__var": {
@@ -188,6 +190,8 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i
188190
"_operator_name": "@custom",
189191
"_task_module": "tests.test_utils.mock_operators",
190192
"pool": "default_pool",
193+
"_is_setup": False,
194+
"_is_teardown": False,
191195
},
192196
],
193197
"schedule_interval": {"__type": "timedelta", "__var": 86400.0},
@@ -1304,6 +1308,11 @@ def check_task_group(node):
13041308
def assert_taskgroup_children(se_task_group, dag_task_group, expected_children):
13051309
assert se_task_group.children.keys() == dag_task_group.children.keys() == expected_children
13061310

1311+
@staticmethod
1312+
def assert_task_is_setup_teardown(task, is_setup: bool = False, is_teardown: bool = False):
1313+
assert task._is_setup == is_setup
1314+
assert task._is_teardown == is_teardown
1315+
13071316
def test_task_group_setup_teardown_tasks(self):
13081317
"""
13091318
Test TaskGroup setup and teardown task serialization/deserialization.
@@ -1335,6 +1344,8 @@ def test_task_group_setup_teardown_tasks(self):
13351344
self.assert_taskgroup_children(
13361345
serialized_dag.task_group, dag.task_group, {"setup", "teardown", "group1"}
13371346
)
1347+
self.assert_task_is_setup_teardown(serialized_dag.task_group.children["setup"], is_setup=True)
1348+
self.assert_task_is_setup_teardown(serialized_dag.task_group.children["teardown"], is_teardown=True)
13381349

13391350
se_first_group = serialized_dag.task_group.children["group1"]
13401351
dag_first_group = dag.task_group.children["group1"]
@@ -1343,6 +1354,9 @@ def test_task_group_setup_teardown_tasks(self):
13431354
dag_first_group,
13441355
{"group1.setup1", "group1.task1", "group1.group2", "group1.teardown1"},
13451356
)
1357+
self.assert_task_is_setup_teardown(se_first_group.children["group1.setup1"], is_setup=True)
1358+
self.assert_task_is_setup_teardown(se_first_group.children["group1.task1"])
1359+
self.assert_task_is_setup_teardown(se_first_group.children["group1.teardown1"], is_teardown=True)
13461360

13471361
se_second_group = se_first_group.children["group1.group2"]
13481362
dag_second_group = dag_first_group.children["group1.group2"]
@@ -1351,6 +1365,11 @@ def test_task_group_setup_teardown_tasks(self):
13511365
dag_second_group,
13521366
{"group1.group2.setup2", "group1.group2.task2", "group1.group2.teardown2"},
13531367
)
1368+
self.assert_task_is_setup_teardown(se_second_group.children["group1.group2.setup2"], is_setup=True)
1369+
self.assert_task_is_setup_teardown(se_second_group.children["group1.group2.task2"])
1370+
self.assert_task_is_setup_teardown(
1371+
se_second_group.children["group1.group2.teardown2"], is_teardown=True
1372+
)
13541373

13551374
def test_task_group_setup_teardown_taskgroups(self):
13561375
"""
@@ -1391,22 +1410,30 @@ def teardown_group():
13911410
self.assert_taskgroup_children(
13921411
serialized_dag.task_group, dag.task_group, {"setup_group", "sometask", "teardown_group"}
13931412
)
1413+
self.assert_task_is_setup_teardown(serialized_dag.task_group.children["sometask"])
13941414

13951415
se_setup_group = serialized_dag.task_group.children["setup_group"]
13961416
dag_setup_group = dag.task_group.children["setup_group"]
13971417
self.assert_taskgroup_children(
13981418
se_setup_group, dag_setup_group, {"setup_group.setup1", "setup_group.sub_setup"}
13991419
)
1420+
self.assert_task_is_setup_teardown(se_setup_group.children["setup_group.setup1"], is_setup=True)
14001421

14011422
se_sub_setup_group = se_setup_group.children["setup_group.sub_setup"]
14021423
dag_sub_setup_group = dag_setup_group.children["setup_group.sub_setup"]
14031424
self.assert_taskgroup_children(
14041425
se_sub_setup_group, dag_sub_setup_group, {"setup_group.sub_setup.setup2"}
14051426
)
1427+
self.assert_task_is_setup_teardown(
1428+
se_sub_setup_group.children["setup_group.sub_setup.setup2"], is_setup=True
1429+
)
14061430

14071431
se_teardown_group = serialized_dag.task_group.children["teardown_group"]
14081432
dag_teardown_group = dag.task_group.children["teardown_group"]
14091433
self.assert_taskgroup_children(se_teardown_group, dag_teardown_group, {"teardown_group.teardown1"})
1434+
self.assert_task_is_setup_teardown(
1435+
se_teardown_group.children["teardown_group.teardown1"], is_teardown=True
1436+
)
14101437

14111438
def test_deps_sorted(self):
14121439
"""

0 commit comments

Comments
 (0)