Skip to content

Commit 00a8adb

Browse files
sryzagengliangwang
authored andcommitted
[SPARK-52663][SDP] Introduce name field to pipeline spec
### What changes were proposed in this pull request? The Declarative Pipelines SPIP included a "name" field in the pipeline spec, but we left that out in the earlier implementation. This adds it in. The name field is required. This matches behavior for similar systems, like dbt. ### Why are the changes needed? See above. ### Does this PR introduce _any_ user-facing change? Yes, but only to unreleased code. ### How was this patch tested? Updated existing tests, and added tests for proper error when the name is missing. ### Was this patch authored or co-authored using generative AI tooling? Closes #51353 from sryza/pipeline-name. Authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent c2942b7 commit 00a8adb

File tree

6 files changed

+62
-8
lines changed

6 files changed

+62
-8
lines changed

docs/declarative-pipelines-programming-guide.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ A YAML-formatted pipeline spec file contains the top-level configuration for the
7575
An example pipeline spec file:
7676

7777
```yaml
78+
name: my_pipeline
7879
definitions:
7980
- glob:
8081
include: transformations/**/*.py
@@ -99,7 +100,7 @@ The `spark-pipelines` command line interface (CLI) is the primary way to execute
99100

100101
### `spark-pipelines init`
101102

102-
`spark-pipelines init` generates a simple pipeline project, including a spec file and example definitions.
103+
`spark-pipelines init --name my_pipeline` generates a simple pipeline project, inside a directory named "my_pipeline", including a spec file and example definitions.
103104

104105
### `spark-pipelines run`
105106

python/pyspark/errors/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,11 @@
884884
"No pipeline.yaml or pipeline.yml file provided in arguments or found in directory `<dir_path>` or readable ancestor directories."
885885
]
886886
},
887+
"PIPELINE_SPEC_MISSING_REQUIRED_FIELD": {
888+
"message": [
889+
"Pipeline spec missing required field `<field_name>`."
890+
]
891+
},
887892
"PIPELINE_SPEC_UNEXPECTED_FIELD": {
888893
"message": [
889894
"Pipeline spec field `<field_name>` is unexpected."

python/pyspark/pipelines/cli.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@ class DefinitionsGlob:
6161
class PipelineSpec:
6262
"""Spec for a pipeline.
6363
64+
:param name: The name of the pipeline.
6465
:param catalog: The default catalog to use for the pipeline.
6566
:param database: The default database to use for the pipeline.
6667
:param configuration: A dictionary of Spark configuration properties to set for the pipeline.
6768
:param definitions: A list of glob patterns for finding pipeline definitions files.
6869
"""
6970

71+
name: str
7072
catalog: Optional[str]
7173
database: Optional[str]
7274
configuration: Mapping[str, str]
@@ -110,13 +112,23 @@ def load_pipeline_spec(spec_path: Path) -> PipelineSpec:
110112

111113

112114
def unpack_pipeline_spec(spec_data: Mapping[str, Any]) -> PipelineSpec:
115+
ALLOWED_FIELDS = {"name", "catalog", "database", "schema", "configuration", "definitions"}
116+
REQUIRED_FIELDS = ["name"]
113117
for key in spec_data.keys():
114-
if key not in ["catalog", "database", "schema", "configuration", "definitions"]:
118+
if key not in ALLOWED_FIELDS:
115119
raise PySparkException(
116120
errorClass="PIPELINE_SPEC_UNEXPECTED_FIELD", messageParameters={"field_name": key}
117121
)
118122

123+
for key in REQUIRED_FIELDS:
124+
if key not in spec_data:
125+
raise PySparkException(
126+
errorClass="PIPELINE_SPEC_MISSING_REQUIRED_FIELD",
127+
messageParameters={"field_name": key},
128+
)
129+
119130
return PipelineSpec(
131+
name=spec_data["name"],
120132
catalog=spec_data.get("catalog"),
121133
database=spec_data.get("database", spec_data.get("schema")),
122134
configuration=validate_str_dict(spec_data.get("configuration", {}), "configuration"),

python/pyspark/pipelines/init_cli.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from pathlib import Path
1919

2020
SPEC = """
21+
name: {{ name }}
2122
definitions:
2223
- glob:
2324
include: transformations/**/*.py
@@ -49,7 +50,7 @@ def init(name: str) -> None:
4950
# Write the spec file to the project directory
5051
spec_file = project_dir / "pipeline.yml"
5152
with open(spec_file, "w") as f:
52-
f.write(SPEC)
53+
f.write(SPEC.replace("{{ name }}", name))
5354

5455
# Create the transformations directory
5556
transformations_dir = project_dir / "transformations"

python/pyspark/pipelines/tests/test_cli.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def test_load_pipeline_spec(self):
5050
tmpfile.write(
5151
"""
5252
{
53+
"name": "test_pipeline",
5354
"catalog": "test_catalog",
5455
"database": "test_database",
5556
"configuration": {
@@ -64,17 +65,44 @@ def test_load_pipeline_spec(self):
6465
)
6566
tmpfile.flush()
6667
spec = load_pipeline_spec(Path(tmpfile.name))
68+
assert spec.name == "test_pipeline"
6769
assert spec.catalog == "test_catalog"
6870
assert spec.database == "test_database"
6971
assert spec.configuration == {"key1": "value1", "key2": "value2"}
7072
assert len(spec.definitions) == 1
7173
assert spec.definitions[0].include == "test_include"
7274

75+
def test_load_pipeline_spec_name_is_required(self):
76+
with tempfile.NamedTemporaryFile(mode="w") as tmpfile:
77+
tmpfile.write(
78+
"""
79+
{
80+
"catalog": "test_catalog",
81+
"database": "test_database",
82+
"configuration": {
83+
"key1": "value1",
84+
"key2": "value2"
85+
},
86+
"definitions": [
87+
{"glob": {"include": "test_include"}}
88+
]
89+
}
90+
"""
91+
)
92+
tmpfile.flush()
93+
with self.assertRaises(PySparkException) as context:
94+
load_pipeline_spec(Path(tmpfile.name))
95+
self.assertEqual(
96+
context.exception.getCondition(), "PIPELINE_SPEC_MISSING_REQUIRED_FIELD"
97+
)
98+
self.assertEqual(context.exception.getMessageParameters(), {"field_name": "name"})
99+
73100
def test_load_pipeline_spec_schema_fallback(self):
74101
with tempfile.NamedTemporaryFile(mode="w") as tmpfile:
75102
tmpfile.write(
76103
"""
77104
{
105+
"name": "test_pipeline",
78106
"catalog": "test_catalog",
79107
"schema": "test_database",
80108
"configuration": {
@@ -120,20 +148,22 @@ def test_load_pipeline_spec_invalid(self):
120148
)
121149

122150
def test_unpack_empty_pipeline_spec(self):
123-
empty_spec = PipelineSpec(catalog=None, database=None, configuration={}, definitions=[])
124-
self.assertEqual(unpack_pipeline_spec({}), empty_spec)
151+
empty_spec = PipelineSpec(
152+
name="test_pipeline", catalog=None, database=None, configuration={}, definitions=[]
153+
)
154+
self.assertEqual(unpack_pipeline_spec({"name": "test_pipeline"}), empty_spec)
125155

126156
def test_unpack_pipeline_spec_bad_configuration(self):
127157
with self.assertRaises(TypeError) as context:
128-
unpack_pipeline_spec({"configuration": "not_a_dict"})
158+
unpack_pipeline_spec({"name": "test_pipeline", "configuration": "not_a_dict"})
129159
self.assertIn("should be a dict", str(context.exception))
130160

131161
with self.assertRaises(TypeError) as context:
132-
unpack_pipeline_spec({"configuration": {"key": {}}})
162+
unpack_pipeline_spec({"name": "test_pipeline", "configuration": {"key": {}}})
133163
self.assertIn("key", str(context.exception))
134164

135165
with self.assertRaises(TypeError) as context:
136-
unpack_pipeline_spec({"configuration": {1: "something"}})
166+
unpack_pipeline_spec({"name": "test_pipeline", "configuration": {1: "something"}})
137167
self.assertIn("int", str(context.exception))
138168

139169
def test_find_pipeline_spec_in_current_directory(self):
@@ -205,6 +235,7 @@ def test_find_pipeline_spec_in_parent_directory(self):
205235

206236
def test_register_definitions(self):
207237
spec = PipelineSpec(
238+
name="test_pipeline",
208239
catalog=None,
209240
database=None,
210241
configuration={},
@@ -247,6 +278,7 @@ def mv2():
247278
def test_register_definitions_file_raises_error(self):
248279
"""Errors raised while executing definitions code should make it to the outer context."""
249280
spec = PipelineSpec(
281+
name="test_pipeline",
250282
catalog=None,
251283
database=None,
252284
configuration={},
@@ -264,6 +296,7 @@ def test_register_definitions_file_raises_error(self):
264296

265297
def test_register_definitions_unsupported_file_extension_matches_glob(self):
266298
spec = PipelineSpec(
299+
name="test_pipeline",
267300
catalog=None,
268301
database=None,
269302
configuration={},
@@ -317,6 +350,7 @@ def test_python_import_current_directory(self):
317350
inner_dir1 / "pipeline.yaml",
318351
registry,
319352
PipelineSpec(
353+
name="test_pipeline",
320354
catalog=None,
321355
database=None,
322356
configuration={},

python/pyspark/pipelines/tests/test_init_cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def test_init(self):
5050
with change_dir(Path(temp_dir) / project_name):
5151
spec_path = find_pipeline_spec(Path.cwd())
5252
spec = load_pipeline_spec(spec_path)
53+
assert spec.name == project_name
5354
registry = LocalGraphElementRegistry()
5455
register_definitions(spec_path, registry, spec)
5556
self.assertEqual(len(registry.datasets), 1)

0 commit comments

Comments
 (0)