8
8
import shutil
9
9
import sys
10
10
import tempfile
11
+ import oci
11
12
12
13
import pandas as pd
13
14
from joblib import dump
14
15
15
16
from ads .opctl import logger
16
17
from ads .common .model_export_util import prepare_generic_model
17
- from ads .opctl .operator .lowcode .common .utils import write_data , call_pandas_fsspec
18
-
18
+ from ads .opctl .operator .lowcode .common .utils import write_data , write_simple_json
19
+ from ads . opctl . operator . lowcode . common . utils import default_signer
19
20
from ..model .forecast_datasets import AdditionalData
20
21
from ..operator_config import ForecastOperatorSpec
21
22
23
+ from oci .data_science import DataScienceClient , DataScienceClientCompositeOperations
24
+
25
+ from oci .data_science .models import ModelConfigurationDetails , InstanceConfiguration , \
26
+ FixedSizeScalingPolicy , CategoryLogDetails , LogDetails , \
27
+ SingleModelDeploymentConfigurationDetails , CreateModelDeploymentDetails
28
+ from ads .common .object_storage_details import ObjectStorageDetails
29
+
22
30
23
31
class ModelDeploymentManager :
24
32
def __init__ (self , spec : ForecastOperatorSpec , additional_data : AdditionalData , previous_model_version = None ):
@@ -27,12 +35,19 @@ def __init__(self, spec: ForecastOperatorSpec, additional_data: AdditionalData,
27
35
self .horizon = spec .horizon
28
36
self .additional_data = additional_data .get_dict_by_series ()
29
37
self .model_obj = {}
30
- self .display_name = spec .what_if_analysis .model_name
31
- self .project_id = spec .what_if_analysis .project_id
32
- self .compartment_id = spec .what_if_analysis .compartment_id
38
+ self .display_name = spec .what_if_analysis .model_display_name
39
+ self .project_id = spec .what_if_analysis .project_id if spec .what_if_analysis .project_id \
40
+ else os .environ .get ('PROJECT_OCID' )
41
+ self .compartment_id = spec .what_if_analysis .compartment_id if spec .what_if_analysis .compartment_id \
42
+ else os .environ .get ('NB_SESSION_COMPARTMENT_OCID' )
43
+ if self .project_id is None or self .compartment_id is None :
44
+ raise ValueError ("Either project_id or compartment_id cannot be None." )
33
45
self .path_to_artifact = f"{ self .spec .output_directory .url } /artifacts/"
34
46
self .pickle_file_path = f"{ self .spec .output_directory .url } /model.pkl"
35
47
self .model_version = previous_model_version + 1 if previous_model_version else 1
48
+ self .catalog_id = None
49
+ self .test_mode = os .environ .get ("TEST_MODE" , False )
50
+ self .deployment_info = {}
36
51
37
52
def _satiny_test (self ):
38
53
"""
@@ -51,8 +66,7 @@ def _satiny_test(self):
51
66
date_col_format = self .spec .datetime_column .format
52
67
sample_prediction_data [date_col_name ] = sample_prediction_data [date_col_name ].dt .strftime (date_col_format )
53
68
sample_prediction_data .to_csv (temp_file .name , index = False )
54
- additional_data_uri = "additional_data"
55
- input_data = {additional_data_uri : {"url" : temp_file .name }}
69
+ input_data = {"additional_data" : {"url" : temp_file .name }}
56
70
prediction_test = predict (input_data , _ )
57
71
logger .info (f"prediction test completed with result :{ prediction_test } " )
58
72
@@ -80,8 +94,11 @@ def save_to_catalog(self):
80
94
81
95
artifact_dict = {"spec" : self .spec .to_dict (), "models" : self .model_obj }
82
96
dump (artifact_dict , os .path .join (self .path_to_artifact , "model.joblib" ))
83
- artifact = prepare_generic_model (self .path_to_artifact , function_artifacts = False , force_overwrite = True ,
84
- data_science_env = True )
97
+ artifact = prepare_generic_model (
98
+ self .path_to_artifact ,
99
+ function_artifacts = False ,
100
+ force_overwrite = True ,
101
+ data_science_env = True )
85
102
86
103
self ._copy_score_file ()
87
104
self ._satiny_test ()
@@ -92,29 +109,115 @@ def save_to_catalog(self):
92
109
series = self .additional_data .keys ()
93
110
description = f"The object contains { len (series )} { self .model_name } models"
94
111
95
- catalog_id = "None"
96
- if not os .environ .get ("TEST_MODE" , False ):
97
- catalog_entry = artifact .save (display_name = self .display_name ,
98
- compartment_id = self .compartment_id ,
99
- project_id = self .project_id ,
100
- description = description )
101
- catalog_id = catalog_entry .id
102
-
112
+ if not self .test_mode :
113
+ catalog_entry = artifact .save (
114
+ display_name = self .display_name ,
115
+ compartment_id = self .compartment_id ,
116
+ project_id = self .project_id ,
117
+ description = description )
118
+ self .catalog_id = catalog_entry .id
103
119
104
120
logger .info (f"Saved { self .model_name } version-v{ self .model_version } to model catalog"
105
- f" with catalog id : { catalog_id } " )
121
+ f" with catalog id : { self . catalog_id } " )
106
122
107
- catalog_mapping = {"catalog_id" : catalog_id , "series" : list (series )}
123
+ self . deployment_info = {"catalog_id" : self . catalog_id , "series" : list (series )}
108
124
109
- write_data (
110
- data = pd .DataFrame ([catalog_mapping ]),
111
- filename = os .path .join (
112
- self .spec .output_directory .url , "model_ids.csv"
113
- ),
114
- format = "csv"
125
+ def create_deployment (self ):
126
+ """Create a model deployment serving"""
127
+
128
+ # create new model deployment
129
+ initial_shape = self .spec .what_if_analysis .model_deployment .initial_shape
130
+ name = self .spec .what_if_analysis .model_deployment .display_name
131
+ description = self .spec .what_if_analysis .model_deployment .description
132
+ auto_scaling_config = self .spec .what_if_analysis .model_deployment .auto_scaling
133
+
134
+ # if auto_scaling_config is defined
135
+ if auto_scaling_config :
136
+ scaling_policy = oci .data_science .models .AutoScalingPolicy (
137
+ policy_type = "AUTOSCALING" ,
138
+ auto_scaling_policies = [
139
+ oci .data_science .models .ThresholdBasedAutoScalingPolicyDetails (
140
+ auto_scaling_policy_type = "THRESHOLD" ,
141
+ rules = [
142
+ oci .data_science .models .PredefinedMetricExpressionRule (
143
+ metric_expression_rule_type = "PREDEFINED_EXPRESSION" ,
144
+ metric_type = auto_scaling_config .scaling_metric ,
145
+ scale_in_configuration = oci .data_science .models .PredefinedExpressionThresholdScalingConfiguration (
146
+ scaling_configuration_type = "THRESHOLD" ,
147
+ threshold = auto_scaling_config .scale_in_threshold
148
+ ),
149
+ scale_out_configuration = oci .data_science .models .PredefinedExpressionThresholdScalingConfiguration (
150
+ scaling_configuration_type = "THRESHOLD" ,
151
+ threshold = auto_scaling_config .scale_out_threshold
152
+ )
153
+ )],
154
+ maximum_instance_count = auto_scaling_config .maximum_instance ,
155
+ minimum_instance_count = auto_scaling_config .minimum_instance ,
156
+ initial_instance_count = auto_scaling_config .minimum_instance )],
157
+ cool_down_in_seconds = auto_scaling_config .cool_down_in_seconds ,
158
+ is_enabled = True )
159
+ logger .info (f"Using autoscaling { auto_scaling_config .scaling_metric } for creating MD" )
160
+ else :
161
+ scaling_policy = FixedSizeScalingPolicy (instance_count = 1 )
162
+ logger .info ("Using fixed size policy for creating MD" )
163
+
164
+ model_configuration_details_object = ModelConfigurationDetails (
165
+ model_id = self .catalog_id ,
166
+ instance_configuration = InstanceConfiguration (
167
+ instance_shape_name = initial_shape ),
168
+ scaling_policy = scaling_policy ,
169
+ bandwidth_mbps = 20 )
170
+
171
+ single_model_config = SingleModelDeploymentConfigurationDetails (
172
+ deployment_type = 'SINGLE_MODEL' ,
173
+ model_configuration_details = model_configuration_details_object
115
174
)
116
- return catalog_id
117
175
118
- def create_deployment (self , deployment_config ):
119
- """Create a model deployment serving"""
120
- pass
176
+ log_group = self .spec .what_if_analysis .model_deployment .log_group
177
+ log_id = self .spec .what_if_analysis .model_deployment .log_id
178
+
179
+ logs_configuration_details_object = CategoryLogDetails (
180
+ access = LogDetails (log_group_id = log_group ,
181
+ log_id = log_id ),
182
+ predict = LogDetails (log_group_id = log_group ,
183
+ log_id = log_id ))
184
+
185
+ model_deploy_configuration = CreateModelDeploymentDetails (
186
+ display_name = name ,
187
+ description = description ,
188
+ project_id = self .project_id ,
189
+ compartment_id = self .compartment_id ,
190
+ model_deployment_configuration_details = single_model_config ,
191
+ category_log_details = logs_configuration_details_object )
192
+
193
+ if not self .test_mode :
194
+ auth = oci .auth .signers .get_resource_principals_signer ()
195
+ data_science = DataScienceClient ({}, signer = auth )
196
+ data_science_composite = DataScienceClientCompositeOperations (data_science )
197
+ model_deployment = data_science_composite .create_model_deployment_and_wait_for_state (
198
+ model_deploy_configuration ,
199
+ wait_for_states = [
200
+ "SUCCEEDED" , "FAILED" ])
201
+ self .deployment_info ['model_deployment_id' ] = model_deployment .data .id
202
+ logger .info (f"deployment metadata :{ model_deployment .data } " )
203
+ md = data_science .get_model_deployment (model_deployment_id = model_deployment .data .resources [0 ].identifier )
204
+ endpoint_url = md .data .model_deployment_url
205
+ self .deployment_info ['model_deployment_endpoint' ] = f"{ endpoint_url } /predict"
206
+
207
+ def save_deployment_info (self ):
208
+ output_dir = self .spec .output_directory .url
209
+ if ObjectStorageDetails .is_oci_path (output_dir ):
210
+ storage_options = default_signer ()
211
+ else :
212
+ storage_options = {}
213
+ write_data (
214
+ data = pd .DataFrame .from_dict (self .deployment_info ),
215
+ filename = os .path .join (output_dir , "deployment_info.json" ),
216
+ format = "json" ,
217
+ storage_options = storage_options ,
218
+ index = False ,
219
+ indent = 4 ,
220
+ orient = "records"
221
+ )
222
+ write_simple_json (self .deployment_info , os .path .join (output_dir , "deployment_info.json" ))
223
+ logger .info (f"Saved deployment info to { output_dir } " )
0 commit comments