Skip to content

Commit b4c8416

Browse files
[v3-0-test] Add bundle root to sys.path in dag processor (apache#50385) (apache#50509)
(cherry picked from commit 3b93650) Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
1 parent 909d424 commit b4c8416

File tree

4 files changed

+37
-26
lines changed

4 files changed

+37
-26
lines changed

airflow-core/src/airflow/dag_processing/processor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@
6767

6868

6969
def _parse_file_entrypoint():
70-
import os
71-
7270
import structlog
7371

7472
from airflow.sdk.execution_time import task_runner
@@ -87,6 +85,11 @@ def _parse_file_entrypoint():
8785
task_runner.SUPERVISOR_COMMS = comms_decoder
8886
log = structlog.get_logger(logger_name="task")
8987

88+
# Put bundle root on sys.path if needed. This allows the dag bundle to add
89+
# code in util modules to be shared between files within the same bundle.
90+
if (bundle_root := os.fspath(msg.bundle_path)) not in sys.path:
91+
sys.path.append(bundle_root)
92+
9093
result = _parse_file(msg, log)
9194
if result is not None:
9295
comms_decoder.send_request(log, result)

airflow-core/src/airflow/settings.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -560,12 +560,6 @@ def prepare_syspath_for_config_and_plugins():
560560
sys.path.append(PLUGINS_FOLDER)
561561

562562

563-
def prepare_syspath_for_dags_folder():
564-
"""Update sys.path to include the DAGs folder."""
565-
if DAGS_FOLDER not in sys.path:
566-
sys.path.append(DAGS_FOLDER)
567-
568-
569563
def import_local_settings():
570564
"""Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
571565
try:
@@ -612,7 +606,6 @@ def initialize():
612606
# in airflow_local_settings to take precendec
613607
load_policy_plugins(POLICY_PLUGIN_MANAGER)
614608
import_local_settings()
615-
prepare_syspath_for_dags_folder()
616609
global LOGGING_CLASS_PATH
617610
LOGGING_CLASS_PATH = configure_logging()
618611

airflow-core/tests/unit/core/test_settings.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,26 +91,16 @@ def teardown_method(self):
9191

9292
@mock.patch("airflow.settings.prepare_syspath_for_config_and_plugins")
9393
@mock.patch("airflow.settings.import_local_settings")
94-
@mock.patch("airflow.settings.prepare_syspath_for_dags_folder")
95-
def test_initialize_order(
96-
self,
97-
mock_prepare_syspath_for_dags_folder,
98-
mock_import_local_settings,
99-
mock_prepare_syspath_for_config_and_plugins,
100-
):
94+
def test_initialize_order(self, mock_import_local_settings, mock_prepare_syspath_for_config_and_plugins):
10195
"""
102-
Tests that import_local_settings is called between prepare_syspath_for_config_and_plugins
103-
and prepare_syspath_for_dags_folder
96+
Tests that import_local_settings is called after prepare_syspath_for_config_and_plugins
10497
"""
10598
mock_local_settings = mock.Mock()
10699

107100
mock_local_settings.attach_mock(
108101
mock_prepare_syspath_for_config_and_plugins, "prepare_syspath_for_config_and_plugins"
109102
)
110103
mock_local_settings.attach_mock(mock_import_local_settings, "import_local_settings")
111-
mock_local_settings.attach_mock(
112-
mock_prepare_syspath_for_dags_folder, "prepare_syspath_for_dags_folder"
113-
)
114104

115105
import airflow.settings
116106

@@ -119,7 +109,6 @@ def test_initialize_order(
119109
expected_calls = [
120110
call.prepare_syspath_for_config_and_plugins(),
121111
call.import_local_settings(),
122-
call.prepare_syspath_for_dags_folder(),
123112
]
124113

125114
mock_local_settings.assert_has_calls(expected_calls)

airflow-core/tests/unit/dag_processing/test_processor.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ def fake_collect_dags(dagbag: DagBag, *args, **kwargs):
129129
assert resp.import_errors is not None
130130
assert "a.py" in resp.import_errors
131131

132-
# @pytest.mark.execution_timeout(10)
133132
def test_top_level_variable_access(
134133
self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
135134
):
@@ -271,9 +270,7 @@ def dag_in_a_fn():
271270
assert result.import_errors == {}
272271
assert result.serialized_dags[0].dag_id == "test_my_conn"
273272

274-
def test_top_level_connection_access_not_found(
275-
self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
276-
):
273+
def test_top_level_connection_access_not_found(self, tmp_path: pathlib.Path):
277274
logger_filehandle = MagicMock()
278275

279276
def dag_in_a_fn():
@@ -297,6 +294,35 @@ def dag_in_a_fn():
297294
if result.import_errors:
298295
assert "CONNECTION_NOT_FOUND" in next(iter(result.import_errors.values()))
299296

297+
def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path):
298+
tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")
299+
300+
dag1_path = tmp_path.joinpath("dag1.py")
301+
dag1_code = """
302+
from util import NAME
303+
304+
from airflow.sdk import DAG
305+
306+
with DAG(NAME):
307+
pass
308+
"""
309+
dag1_path.write_text(textwrap.dedent(dag1_code))
310+
311+
proc = DagFileProcessorProcess.start(
312+
id=1,
313+
path=dag1_path,
314+
bundle_path=tmp_path,
315+
callbacks=[],
316+
logger_filehandle=MagicMock(),
317+
)
318+
while not proc.is_ready:
319+
proc._service_subprocess(0.1)
320+
321+
result = proc.parsing_result
322+
assert result is not None
323+
assert result.import_errors == {}
324+
assert result.serialized_dags[0].dag_id == "dag_name"
325+
300326

301327
def write_dag_in_a_fn_to_file(fn: Callable[[], None], folder: pathlib.Path) -> pathlib.Path:
302328
# Create the dag in a fn, and use inspect.getsource to write it to a file so that

0 commit comments

Comments
 (0)