Skip to content

Commit 3a38dcf

Browse files
authored
Added support for mounting file storage for oci job (#139)
2 parents a995293 + 60b30e8 commit 3a38dcf

File tree

3 files changed

+602
-0
lines changed

3 files changed

+602
-0
lines changed

ads/common/dsc_file_system.py

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8; -*-
3+
4+
# Copyright (c) 2023 Oracle and/or its affiliates.
5+
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
6+
import ads
7+
import oci
8+
import ipaddress
9+
10+
from dataclasses import dataclass
11+
12+
FILE_STORAGE_TYPE = "FILE_STORAGE"
13+
14+
15+
@dataclass
16+
class DSCFileSystem:
17+
18+
src: str = None
19+
dest: str = None
20+
storage_type: str = None
21+
destination_directory_name: str = None
22+
23+
def update_to_dsc_model(self) -> dict:
24+
"""Updates arguments to dsc model.
25+
26+
Returns
27+
-------
28+
dict:
29+
A dictionary of arguments.
30+
"""
31+
pass
32+
33+
@classmethod
34+
def update_from_dsc_model(cls, dsc_model) -> dict:
35+
"""Updates arguments from dsc model.
36+
37+
Parameters
38+
----------
39+
dsc_model: oci.data_science.models.JobStorageMountConfigurationDetails
40+
An instance of oci.data_science.models.JobStorageMountConfigurationDetails.
41+
42+
Returns
43+
-------
44+
dict
45+
A dictionary of arguments.
46+
"""
47+
pass
48+
49+
50+
@dataclass
51+
class OCIFileStorage(DSCFileSystem):
52+
53+
mount_target_id: str = None
54+
mount_target: str = None
55+
export_id: str = None
56+
export_path: str = None
57+
storage_type: str = FILE_STORAGE_TYPE
58+
59+
def update_to_dsc_model(self) -> dict:
60+
"""Updates arguments to dsc model.
61+
62+
Returns
63+
-------
64+
dict:
65+
A dictionary of arguments.
66+
"""
67+
arguments = {
68+
"destinationDirectoryName" : self.dest,
69+
"storageType" : self.storage_type
70+
}
71+
72+
self._get_mount_target_and_export_ids(arguments)
73+
74+
return arguments
75+
76+
def _get_mount_target_and_export_ids(self, arguments: dict):
77+
"""Gets the mount target id and export id from src.
78+
79+
Parameters
80+
----------
81+
arguments: dict
82+
A dictionary of arguments.
83+
"""
84+
resource_client = oci.resource_search.ResourceSearchClient(**ads.auth.default_signer())
85+
src_list = self.src.split(":")
86+
first_segment = src_list[0]
87+
second_segment = src_list[1]
88+
89+
if first_segment.startswith("ocid") and "mounttarget" in first_segment:
90+
arguments["mountTargetId"] = first_segment
91+
else:
92+
ip_resource = resource_client.search_resources(
93+
search_details=oci.resource_search.models.FreeTextSearchDetails(
94+
text=first_segment,
95+
matching_context_type="NONE"
96+
)
97+
).data.items
98+
99+
ip_resource = sorted(ip_resource, key=lambda resource_summary: resource_summary.time_created)
100+
101+
if not ip_resource or not hasattr(ip_resource[-1], "identifier"):
102+
raise ValueError(f"Can't find the identifier from ip {first_segment}. Specify a valid `src`.")
103+
104+
mount_target_resource = resource_client.search_resources(
105+
search_details=oci.resource_search.models.FreeTextSearchDetails(
106+
text=ip_resource[-1].identifier,
107+
matching_context_type="NONE"
108+
)
109+
).data.items
110+
111+
mount_targets = [
112+
mount_target.identifier
113+
for mount_target in mount_target_resource
114+
if mount_target.resource_type == "MountTarget"
115+
]
116+
if len(mount_targets) == 0:
117+
raise ValueError(
118+
f"No `mount_target_id` found under ip {first_segment}. Specify a valid `src`."
119+
)
120+
if len(mount_targets) > 1:
121+
raise ValueError(
122+
f"Multiple `mount_target_id` found under ip {first_segment}. Specify the `mount_target_id` in `src` instead."
123+
)
124+
125+
arguments["mountTargetId"] = mount_targets[0]
126+
127+
if second_segment.startswith("ocid") and "export" in second_segment:
128+
arguments["exportId"] = second_segment
129+
else:
130+
export_resource = resource_client.search_resources(
131+
search_details=oci.resource_search.models.FreeTextSearchDetails(
132+
text=second_segment,
133+
matching_context_type="NONE"
134+
)
135+
).data.items
136+
137+
exports = [
138+
export.identifier
139+
for export in export_resource
140+
if export.resource_type == "Export"
141+
]
142+
if len(exports) == 0:
143+
raise ValueError(
144+
f"No `export_id` found with `export_path` {second_segment}. Specify a valid `src`."
145+
)
146+
if len(exports) > 1:
147+
raise ValueError(
148+
f"Multiple `export_id` found with `export_path` {second_segment}. Specify the `export_id` in `src` instead."
149+
)
150+
151+
arguments["exportId"] = exports[0]
152+
153+
@classmethod
154+
def update_from_dsc_model(cls, dsc_model) -> dict:
155+
"""Updates arguments from dsc model.
156+
157+
Parameters
158+
----------
159+
dsc_model: oci.data_science.models.JobStorageMountConfigurationDetails
160+
An instance of oci.data_science.models.JobStorageMountConfigurationDetails.
161+
162+
Returns
163+
-------
164+
dict
165+
A dictionary of arguments.
166+
"""
167+
if not dsc_model.mount_target_id:
168+
raise ValueError(
169+
"Missing parameter `mount_target_id` from service. Check service log to see the error."
170+
)
171+
if not dsc_model.export_id:
172+
raise ValueError(
173+
"Missing parameter `export_id` from service. Check service log to see the error."
174+
)
175+
if not dsc_model.destination_directory_name:
176+
raise ValueError(
177+
"Missing parameter `destination_directory_name` from service. Check service log to see the error."
178+
)
179+
180+
return {
181+
"src" : f"{dsc_model.mount_target_id}:{dsc_model.export_id}",
182+
"dest" : dsc_model.destination_directory_name
183+
}
184+
185+
186+
class DSCFileSystemManager:
187+
188+
@classmethod
189+
def initialize(cls, arguments: dict) -> dict:
190+
"""Initialize and update arguments to dsc model.
191+
192+
Parameters
193+
----------
194+
arguments: dict
195+
A dictionary of arguments.
196+
"""
197+
if "src" not in arguments:
198+
raise ValueError(
199+
"Parameter `src` is required for mounting file storage system."
200+
)
201+
202+
if "dest" not in arguments:
203+
raise ValueError(
204+
"Parameter `dest` is required for mounting file storage system."
205+
)
206+
207+
first_segment = arguments["src"].split(":")[0]
208+
# case <mount_target_id>:<export_id>
209+
if first_segment.startswith("ocid"):
210+
return OCIFileStorage(**arguments).update_to_dsc_model()
211+
212+
# case <ip_address>:<export_path>
213+
try:
214+
ipaddress.IPv4Network(first_segment)
215+
return OCIFileStorage(**arguments).update_to_dsc_model()
216+
except:
217+
pass
218+
219+
raise ValueError("Invalid dict for mounting file systems. Specify a valid one.")

ads/jobs/builders/infrastructure/dsc_job.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import datetime
99
import logging
10+
import oci
1011
import os
1112
import time
1213
import traceback
@@ -34,10 +35,17 @@
3435
from ads.jobs.builders.runtimes.container_runtime import ContainerRuntime
3536
from ads.jobs.builders.runtimes.python_runtime import GitPythonRuntime
3637

38+
from ads.common.dsc_file_system import (
39+
OCIFileStorage,
40+
DSCFileSystemManager
41+
)
42+
3743
logger = logging.getLogger(__name__)
3844

3945
SLEEP_INTERVAL = 3
4046
WAIT_SECONDS_AFTER_FINISHED = 90
47+
MAXIMUM_MOUNT_COUNT = 5
48+
FILE_STORAGE_TYPE = "FILE_STORAGE"
4149

4250

4351
class DSCJob(OCIDataScienceMixin, oci.data_science.models.Job):
@@ -833,6 +841,13 @@ class DataScienceJob(Infrastructure):
833841
.with_shape_config_details(memory_in_gbs=16, ocpus=1)
834842
# Minimum/Default block storage size is 50 (GB).
835843
.with_block_storage_size(50)
844+
# A list of file systems to be mounted
845+
.with_storage_mount(
846+
{
847+
"src" : "<mount_target_ip_address>:<export_path>",
848+
"dest" : "<destination_directory_name>"
849+
}
850+
)
836851
)
837852
838853
"""
@@ -850,6 +865,7 @@ class DataScienceJob(Infrastructure):
850865
CONST_OCPUS = "ocpus"
851866
CONST_LOG_ID = "logId"
852867
CONST_LOG_GROUP_ID = "logGroupId"
868+
CONST_STORAGE_MOUNT = "storageMount"
853869

854870
attribute_map = {
855871
CONST_PROJECT_ID: "project_id",
@@ -863,6 +879,7 @@ class DataScienceJob(Infrastructure):
863879
CONST_SHAPE_CONFIG_DETAILS: "shape_config_details",
864880
CONST_LOG_ID: "log_id",
865881
CONST_LOG_GROUP_ID: "log_group_id",
882+
CONST_STORAGE_MOUNT: "storage_mount",
866883
}
867884

868885
shape_config_details_attribute_map = {
@@ -888,6 +905,8 @@ class DataScienceJob(Infrastructure):
888905
v.split(".", maxsplit=1)[-1]: k for k, v in payload_attribute_map.items()
889906
}
890907

908+
storage_mount_type_dict = {FILE_STORAGE_TYPE: OCIFileStorage}
909+
891910
@staticmethod
892911
def standardize_spec(spec):
893912
if not spec:
@@ -1212,6 +1231,46 @@ def log_group_id(self) -> str:
12121231
"""
12131232
return self.get_spec(self.CONST_LOG_GROUP_ID)
12141233

1234+
def with_storage_mount(
1235+
self, *storage_mount: List[dict]
1236+
) -> DataScienceJob:
1237+
"""Sets the file systems to be mounted for the data science job.
1238+
A maximum number of 5 file systems are allowed to be mounted for a single data science job.
1239+
1240+
Parameters
1241+
----------
1242+
storage_mount : List[dict]
1243+
A list of file systems to be mounted.
1244+
1245+
Returns
1246+
-------
1247+
DataScienceJob
1248+
The DataScienceJob instance (self)
1249+
"""
1250+
storage_mount_list = []
1251+
for item in storage_mount:
1252+
if not isinstance(item, dict):
1253+
raise ValueError(
1254+
"Parameter `storage_mount` should be a list of dictionaries."
1255+
)
1256+
storage_mount_list.append(item)
1257+
if len(storage_mount_list) > MAXIMUM_MOUNT_COUNT:
1258+
raise ValueError(
1259+
f"A maximum number of {MAXIMUM_MOUNT_COUNT} file systems are allowed to be mounted at this time for a job."
1260+
)
1261+
return self.set_spec(self.CONST_STORAGE_MOUNT, storage_mount_list)
1262+
1263+
@property
1264+
def storage_mount(self) -> List[dict]:
1265+
"""Files systems that have been mounted for the data science job
1266+
1267+
Returns
1268+
-------
1269+
list
1270+
A list of file systems that have been mounted
1271+
"""
1272+
return self.get_spec(self.CONST_STORAGE_MOUNT, [])
1273+
12151274
def _prepare_log_config(self) -> dict:
12161275
if not self.log_group_id and not self.log_id:
12171276
return None
@@ -1285,6 +1344,42 @@ def _update_from_dsc_model(
12851344
sub_spec[sub_infra_attr] = sub_value
12861345
if sub_spec:
12871346
self._spec[infra_attr] = sub_spec
1347+
1348+
self._update_storage_mount_from_dsc_model(dsc_job, overwrite)
1349+
return self
1350+
1351+
def _update_storage_mount_from_dsc_model(
1352+
self, dsc_job: oci.data_science.models.Job, overwrite: bool = True
1353+
) -> DataScienceJob:
1354+
"""Update the mount storage properties from an OCI data science job model.
1355+
1356+
Parameters
1357+
----------
1358+
dsc_job: oci.data_science.models.Job
1359+
An OCI data science job model.
1360+
1361+
overwrite: bool
1362+
Whether to overwrite the existing values.
1363+
If this is set to False, only the empty/None properties will be updated.
1364+
1365+
Returns
1366+
-------
1367+
DataScienceJob
1368+
The DataScienceJob instance (self)
1369+
"""
1370+
storage_mount_list = get_value(
1371+
dsc_job, "job_storage_mount_configuration_details_list"
1372+
)
1373+
if storage_mount_list:
1374+
storage_mount = [
1375+
self.storage_mount_type_dict[
1376+
file_system.storage_type
1377+
].update_from_dsc_model(file_system)
1378+
for file_system in storage_mount_list
1379+
if file_system.storage_type in self.storage_mount_type_dict
1380+
]
1381+
if overwrite or not self.get_spec(self.CONST_STORAGE_MOUNT):
1382+
self.set_spec(self.CONST_STORAGE_MOUNT, storage_mount)
12881383
return self
12891384

12901385
def _update_job_infra(self, dsc_job: DSCJob) -> DataScienceJob:
@@ -1321,6 +1416,17 @@ def _update_job_infra(self, dsc_job: DSCJob) -> DataScienceJob:
13211416
dsc_job.job_infrastructure_configuration_details[
13221417
"jobInfrastructureType"
13231418
] = JobInfrastructureConfigurationDetails.JOB_INFRASTRUCTURE_TYPE_STANDALONE
1419+
1420+
if self.storage_mount:
1421+
if not hasattr(
1422+
oci.data_science.models, "JobStorageMountConfigurationDetails"
1423+
):
1424+
raise EnvironmentError(
1425+
"Storage mount hasn't been supported in the current OCI SDK installed."
1426+
)
1427+
dsc_job.job_storage_mount_configuration_details_list = [
1428+
DSCFileSystemManager.initialize(file_system) for file_system in self.storage_mount
1429+
]
13241430
return self
13251431

13261432
def build(self) -> DataScienceJob:

0 commit comments

Comments
 (0)