Skip to content

Commit a7c7556

Browse files
authored
Added defined tags for data flow (#210)
2 parents eb56fd5 + be91cbb commit a7c7556

File tree

4 files changed

+122
-9
lines changed

4 files changed

+122
-9
lines changed

ads/jobs/builders/infrastructure/dataflow.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,8 @@ class DataFlow(Infrastructure):
384384
CONST_OCPUS = "ocpus"
385385
CONST_ID = "id"
386386
CONST_PRIVATE_ENDPOINT_ID = "private_endpoint_id"
387+
CONST_FREEFORM_TAGS = "freeform_tags"
388+
CONST_DEFINED_TAGS = "defined_tags"
387389

388390
attribute_map = {
389391
CONST_COMPARTMENT_ID: "compartmentId",
@@ -402,6 +404,8 @@ class DataFlow(Infrastructure):
402404
CONST_OCPUS: CONST_OCPUS,
403405
CONST_ID: CONST_ID,
404406
CONST_PRIVATE_ENDPOINT_ID: "privateEndpointId",
407+
CONST_FREEFORM_TAGS: "freeformTags",
408+
CONST_DEFINED_TAGS: "definedTags"
405409
}
406410

407411
def __init__(self, spec: dict = None, **kwargs):
@@ -414,7 +418,9 @@ def __init__(self, spec: dict = None, **kwargs):
414418
spec = {
415419
k: v
416420
for k, v in spec.items()
417-
if f"with_{camel_to_snake(k)}" in self.__dir__() and v is not None
421+
if (f"with_{camel_to_snake(k)}" in self.__dir__()
422+
or (k == "defined_tags" or "freeform_tags"))
423+
and v is not None
418424
}
419425
defaults.update(spec)
420426
super().__init__(defaults, **kwargs)
@@ -775,9 +781,31 @@ def with_private_endpoint_id(self, private_endpoint_id: str) -> "DataFlow":
775781
the Data Flow instance itself
776782
"""
777783
return self.set_spec(self.CONST_PRIVATE_ENDPOINT_ID, private_endpoint_id)
784+
785+
def with_freeform_tag(self, **kwargs) -> "DataFlow":
786+
"""Sets freeform tags
787+
788+
Returns
789+
-------
790+
DataFlow
791+
The DataFlow instance (self)
792+
"""
793+
return self.set_spec(self.CONST_FREEFORM_TAGS, kwargs)
794+
795+
def with_defined_tag(self, **kwargs) -> "DataFlow":
796+
"""Sets defined tags
797+
798+
Returns
799+
-------
800+
DataFlow
801+
The DataFlow instance (self)
802+
"""
803+
return self.set_spec(self.CONST_DEFINED_TAGS, kwargs)
778804

779805
def __getattr__(self, item):
780-
if f"with_{item}" in self.__dir__():
806+
if item == self.CONST_DEFINED_TAGS or item == self.CONST_FREEFORM_TAGS:
807+
return self.get_spec(item)
808+
elif f"with_{item}" in self.__dir__() and item != "defined_tag" and item != "freeform_tag":
781809
return self.get_spec(item)
782810
raise AttributeError(f"Attribute {item} not found.")
783811

@@ -849,7 +877,8 @@ def create(self, runtime: DataFlowRuntime, **kwargs) -> "DataFlow":
849877
{
850878
"display_name": self.name,
851879
"file_uri": runtime.script_uri,
852-
"freeform_tags": runtime.freeform_tags,
880+
"freeform_tags": runtime.freeform_tags or self.freeform_tags,
881+
"defined_tags": runtime.defined_tags or self.defined_tags,
853882
"archive_uri": runtime.archive_uri,
854883
"configuration": runtime.configuration,
855884
}
@@ -915,6 +944,7 @@ def run(
915944
args: List[str] = None,
916945
env_vars: Dict[str, str] = None,
917946
freeform_tags: Dict[str, str] = None,
947+
defined_tags: Dict[str, Dict[str, object]] = None,
918948
wait: bool = False,
919949
**kwargs,
920950
) -> DataFlowRun:
@@ -932,6 +962,8 @@ def run(
932962
dictionary of environment variables (not used for data flow)
933963
freeform_tags: Dict[str, str], optional
934964
freeform tags
965+
defined_tags: Dict[str, Dict[str, object]], optional
966+
defined tags
935967
wait: bool, optional
936968
whether to wait for a run to terminate
937969
kwargs
@@ -950,7 +982,8 @@ def run(
950982
# Set default display_name if not specified - randomly generated easy to remember name generated
951983
payload["display_name"] = name if name else utils.get_random_name_for_resource()
952984
payload["arguments"] = args if args and len(args) > 0 else None
953-
payload["freeform_tags"] = freeform_tags
985+
payload["freeform_tags"] = freeform_tags or self.freeform_tags
986+
payload["defined_tags"] = defined_tags or self.defined_tags
954987
payload.pop("spark_version", None)
955988
logger.debug(f"Creating a DataFlow Run with payload {payload}")
956989
run = DataFlowRun(**payload).create()

docs/source/user_guide/apachespark/dataflow.rst

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ You can set them using the ``with_{property}`` functions:
207207
- ``with_spark_version``
208208
- ``with_warehouse_bucket_uri``
209209
- ``with_private_endpoint_id`` (`doc <https://docs.oracle.com/en-us/iaas/data-flow/using/pe-allowing.htm#pe-allowing>`__)
210+
- ``with_defined_tags``
211+
- ``with_freeform_tags``
210212

211213
For more details, see `Data Flow class documentation <https://docs.oracle.com/en-us/iaas/tools/ads-sdk/latest/ads.jobs.html#module-ads.jobs.builders.infrastructure.dataflow>`__.
212214

@@ -229,10 +231,10 @@ create applications.
229231

230232
In the following "hello-world" example, ``DataFlow`` is populated with ``compartment_id``,
231233
``driver_shape``, ``driver_shape_config``, ``executor_shape``, ``executor_shape_config``
232-
and ``spark_version``. ``DataFlowRuntime`` is populated with ``script_uri`` and
233-
``script_bucket``. The ``script_uri`` specifies the path to the script. It can be
234-
local or remote (an Object Storage path). If the path is local, then
235-
``script_bucket`` must be specified additionally because Data Flow
234+
, ``spark_version``, ``defined_tags`` and ``freeform_tags``. ``DataFlowRuntime`` is
235+
populated with ``script_uri`` and ``script_bucket``. The ``script_uri`` specifies the
236+
path to the script. It can be local or remote (an Object Storage path). If the path
237+
is local, then ``script_bucket`` must be specified additionally because Data Flow
236238
requires a script to be available in Object Storage. ADS
237239
performs the upload step for you, as long as you give the bucket name
238240
or the Object Storage path prefix to upload the script. Either can be
@@ -272,6 +274,10 @@ accepted. In the next example, the prefix is given for ``script_bucket``.
272274
.with_executor_shape("VM.Standard.E4.Flex")
273275
.with_executor_shape_config(ocpus=4, memory_in_gbs=64)
274276
.with_spark_version("3.0.2")
277+
.with_defined_tag(
278+
**{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}}
279+
)
280+
.with_freeform_tag(test_freeform_key="test_freeform_value")
275281
)
276282
runtime_config = (
277283
DataFlowRuntime()
@@ -393,6 +399,10 @@ In the next example, ``archive_uri`` is given as an Object Storage location.
393399
"spark.driverEnv.myEnvVariable": "value1",
394400
"spark.executorEnv.myEnvVariable": "value2",
395401
})
402+
.with_defined_tag(
403+
**{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}}
404+
)
405+
.with_freeform_tag(test_freeform_key="test_freeform_value")
396406
)
397407
runtime_config = (
398408
DataFlowRuntime()
@@ -566,6 +576,11 @@ into the ``Job.from_yaml()`` function to build a Data Flow job:
566576
numExecutors: 1
567577
sparkVersion: 3.2.1
568578
privateEndpointId: <private_endpoint_ocid>
579+
definedTags:
580+
Oracle-Tags:
581+
CreatedBy: test_name@oracle.com
582+
freeformTags:
583+
test_freeform_key: test_freeform_value
569584
type: dataFlow
570585
name: dataflow_app_name
571586
runtime:
@@ -647,6 +662,12 @@ into the ``Job.from_yaml()`` function to build a Data Flow job:
647662
configuration:
648663
required: false
649664
type: dict
665+
definedTags:
666+
required: false
667+
type: dict
668+
freeformTags:
669+
required: false
670+
type: dict
650671
type:
651672
allowed:
652673
- dataFlow
@@ -694,7 +715,10 @@ into the ``Job.from_yaml()`` function to build a Data Flow job:
694715
configuration:
695716
required: false
696717
type: dict
697-
freeform_tag:
718+
definedTags:
719+
required: false
720+
type: dict
721+
freeformTags:
698722
required: false
699723
type: dict
700724
scriptBucket:

docs/source/user_guide/apachespark/quickstart.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ followed by the spark version, 3.2.1.
4545
.with_executor_shape("VM.Standard.E4.Flex")
4646
.with_executor_shape_config(ocpus=4, memory_in_gbs=64)
4747
.with_spark_version("3.2.1")
48+
.with_defined_tag(
49+
**{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}}
50+
)
51+
.with_freeform_tag(test_freeform_key="test_freeform_value")
4852
)
4953
runtime_config = (
5054
DataFlowRuntime()
@@ -95,6 +99,11 @@ Assuming you have the following two files written in your current directory as `
9599
memory_in_gbs: 64
96100
sparkVersion: 3.2.1
97101
numExecutors: 1
102+
definedTags:
103+
Oracle-Tags:
104+
CreatedBy: test_name@oracle.com
105+
freeformTags:
106+
test_freeform_key: test_freeform_value
98107
type: dataFlow
99108
runtime:
100109
kind: runtime
@@ -185,6 +194,10 @@ From a Python Environment
185194
.with_executor_shape("VM.Standard.E4.Flex")
186195
.with_executor_shape_config(ocpus=4, memory_in_gbs=64)
187196
.with_spark_version("3.2.1")
197+
.with_defined_tag(
198+
**{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}}
199+
)
200+
.with_freeform_tag(test_freeform_key="test_freeform_value")
188201
)
189202
runtime_config = (
190203
DataFlowRuntime()
@@ -275,6 +288,11 @@ Again, assume you have the following two files written in your current directory
275288
memory_in_gbs: 64
276289
sparkVersion: 3.2.1
277290
numExecutors: 1
291+
definedTags:
292+
Oracle-Tags:
293+
CreatedBy: test_name@oracle.com
294+
freeformTags:
295+
test_freeform_key: test_freeform_value
278296
type: dataFlow
279297
runtime:
280298
kind: runtime

tests/unitary/default_setup/jobs/test_jobs_dataflow.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,13 +320,27 @@ def df(self):
320320
2
321321
).with_private_endpoint_id(
322322
"test_private_endpoint"
323+
).with_freeform_tag(
324+
test_freeform_tags_key="test_freeform_tags_value",
325+
).with_defined_tag(
326+
test_defined_tags_namespace={
327+
"test_defined_tags_key": "test_defined_tags_value"
328+
}
323329
)
324330
return df
325331

326332
def test_create_with_builder_pattern(self, mock_to_dict, mock_client, df):
327333
assert df.language == "PYTHON"
328334
assert df.spark_version == "3.2.1"
329335
assert df.num_executors == 2
336+
assert df.freeform_tags == {
337+
"test_freeform_tags_key": "test_freeform_tags_value"
338+
}
339+
assert df.defined_tags == {
340+
"test_defined_tags_namespace": {
341+
"test_defined_tags_key": "test_defined_tags_value"
342+
}
343+
}
330344

331345
rt = (
332346
DataFlowRuntime()
@@ -335,9 +349,25 @@ def test_create_with_builder_pattern(self, mock_to_dict, mock_client, df):
335349
.with_custom_conda(
336350
"oci://my_bucket@my_namespace/conda_environments/cpu/PySpark 3.0 and Data Flow/5.0/pyspark30_p37_cpu_v5"
337351
)
352+
.with_freeform_tag(
353+
test_freeform_tags_runtime_key="test_freeform_tags_runtime_value"
354+
)
355+
.with_defined_tag(
356+
test_defined_tags_namespace={
357+
"test_defined_tags_runtime_key": "test_defined_tags_runtime_value"
358+
}
359+
)
338360
.with_overwrite(True)
339361
)
340362
assert rt.overwrite == True
363+
assert rt.freeform_tags == {
364+
"test_freeform_tags_runtime_key": "test_freeform_tags_runtime_value"
365+
}
366+
assert rt.defined_tags == {
367+
"test_defined_tags_namespace": {
368+
"test_defined_tags_runtime_key": "test_defined_tags_runtime_value"
369+
}
370+
}
341371

342372
with patch.object(DataFlowApp, "client", mock_client):
343373
with patch.object(DataFlowApp, "to_dict", mock_to_dict):
@@ -429,6 +459,14 @@ def test_to_and_from_dict(self, df):
429459
assert df_dict["spec"]["privateEndpointId"] == "test_private_endpoint"
430460
assert df_dict["spec"]["driverShapeConfig"] == {"memoryInGBs": 1, "ocpus": 16}
431461
assert df_dict["spec"]["executorShapeConfig"] == {"memoryInGBs": 1, "ocpus": 16}
462+
assert df_dict["spec"]["freeformTags"] == {
463+
"test_freeform_tags_key": "test_freeform_tags_value"
464+
}
465+
assert df_dict["spec"]["definedTags"] == {
466+
"test_defined_tags_namespace": {
467+
"test_defined_tags_key": "test_defined_tags_value"
468+
}
469+
}
432470

433471
df_dict["spec"].pop("language")
434472
df_dict["spec"].pop("numExecutors")

0 commit comments

Comments
 (0)