Skip to content

Commit 1099bc1

Browse files
committed
deployment dev changes
1 parent 1c6b196 commit 1099bc1

File tree

4 files changed

+214
-35
lines changed

4 files changed

+214
-35
lines changed

.github/workflows/snowpark-ci-cd.yml

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -265,16 +265,27 @@ jobs:
265265
local component_name=$2
266266
local component_type=$3
267267
268-
echo "⏳ Deploying $component_name..."
268+
# Check if component files have changed since last deployment
269+
echo "Checking for changes in $component_path..."
269270
270-
# Use our Python deployment script with additional logging
271-
PYTHONPATH=$PYTHONPATH:$(pwd) python -u scripts/snowflake_deployer.py deploy --profile $CONN_PROFILE --path "$component_path" --name "$component_name" --type "$component_type"
271+
# Get the last commit hash where this component was changed
272+
local last_change=$(git log -n 1 --format=format:%H --full-diff -- "$component_path")
272273
273-
if [ $? -eq 0 ]; then
274-
echo "✅ Successfully deployed $component_name"
274+
# If we're on main branch or there are changes in the component files, deploy
275+
if [[ -n "$last_change" && $(git rev-list HEAD^..HEAD | grep -c $last_change) -gt 0 ]]; then
276+
echo "⏳ Deploying $component_name (changes detected)..."
277+
278+
# Use our Python deployment script with additional logging
279+
PYTHONPATH=$PYTHONPATH:$(pwd) python -u scripts/snowflake_deployer.py deploy --profile $CONN_PROFILE --path "$component_path" --name "$component_name" --type "$component_type"
280+
281+
if [ $? -eq 0 ]; then
282+
echo "✅ Successfully deployed $component_name"
283+
else
284+
echo "❌ Deploy failed for $component_name"
285+
exit 1
286+
fi
275287
else
276-
echo "❌ Deploy failed for $component_name"
277-
exit 1
288+
echo "⏭️ Skipping $component_name (no changes detected)"
278289
fi
279290
}
280291
@@ -290,20 +301,4 @@ jobs:
290301
deploy_component "udfs_and_spoc/co2_harmonized_sp" "HARMONIZE_CO2_DATA" "procedure"
291302
deploy_component "udfs_and_spoc/co2_analytical_sp" "ANALYZE_CO2_DATA" "procedure"
292303
293-
echo "🎉 All components deployed successfully!"
294-
295-
- name: Orchestrate Tasks
296-
run: |
297-
echo "⏳ Orchestrating tasks for $ENVIRONMENT environment..."
298-
299-
# Use our Python script to execute SQL
300-
python scripts/snowflake_deployer.py sql --profile $CONN_PROFILE --file "scripts/orchestrate_tasks_${ENVIRONMENT}.sql"
301-
302-
if [ $? -eq 0 ]; then
303-
echo "✅ Tasks successfully orchestrated for $ENVIRONMENT environment"
304-
else
305-
echo "❌ Task orchestration failed for $ENVIRONMENT environment"
306-
exit 1
307-
fi
308-
309-
echo "🏁 Deployment complete!"
304+
echo "🎉 All components processed successfully!"

scripts/check_and_fix_udf.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import os
2+
import sys
3+
import argparse
4+
5+
def fix_udf_function(udf_path):
6+
"""
7+
Check and fix the UDF function signature to work with Snowflake.
8+
This wraps a 2-parameter function to accept a single parameter from Snowflake.
9+
"""
10+
function_file = os.path.join(udf_path, "function.py")
11+
12+
if not os.path.exists(function_file):
13+
print(f"Error: Function file not found at: {function_file}")
14+
return False
15+
16+
# Back up original file
17+
backup_file = function_file + ".bak"
18+
try:
19+
with open(function_file, 'r') as f:
20+
content = f.read()
21+
22+
# Save backup
23+
with open(backup_file, 'w') as f:
24+
f.write(content)
25+
26+
# Check if the function has a session parameter
27+
if "def main(session, input_data" in content:
28+
print("Found Snowpark UDF with session parameter")
29+
30+
# Add wrapper function that gets session from snowflake.snowpark.functions
31+
modified_content = content.replace(
32+
"def main(session, input_data",
33+
"""# Original function
34+
def main_with_session(session, input_data
35+
36+
# Wrapper function that Snowflake calls directly - gets session and passes to original
37+
def main(input_data"""
38+
)
39+
40+
# Add the call to the original function at the end
41+
if "return" in modified_content:
42+
# If there's a return statement, add the wrapper call before the last return
43+
lines = modified_content.split("\n")
44+
for i in range(len(lines)-1, -1, -1):
45+
if lines[i].strip().startswith("return"):
46+
last_return_idx = i
47+
break
48+
49+
# Insert the wrapper call before the last return with proper indentation
50+
indent = lines[last_return_idx].split("return")[0]
51+
lines.insert(last_return_idx, f"{indent}# Get session from snowflake context")
52+
lines.insert(last_return_idx+1, f"{indent}from snowflake.snowpark.context import get_active_session")
53+
lines.insert(last_return_idx+2, f"{indent}session = get_active_session()")
54+
lines.insert(last_return_idx+3, f"{indent}# Call the original function with session")
55+
lines.insert(last_return_idx+4, f"{indent}return main_with_session(session, input_data)")
56+
57+
modified_content = "\n".join(lines)
58+
else:
59+
# If no return, add it at the end
60+
modified_content += """
61+
# Get session from snowflake context
62+
from snowflake.snowpark.context import get_active_session
63+
session = get_active_session()
64+
# Call the original function with session
65+
return main_with_session(session, input_data)
66+
"""
67+
68+
# Write the modified content
69+
with open(function_file, 'w') as f:
70+
f.write(modified_content)
71+
72+
print(f"✅ Updated {function_file} with wrapper function")
73+
print(f"Original file backed up to {backup_file}")
74+
return True
75+
else:
76+
print("No session parameter detected, no changes needed")
77+
return False
78+
79+
except Exception as e:
80+
print(f"Error fixing UDF: {e}")
81+
# Try to restore backup if it exists
82+
if os.path.exists(backup_file):
83+
print("Restoring backup...")
84+
with open(backup_file, 'r') as f:
85+
original = f.read()
86+
with open(function_file, 'w') as f:
87+
f.write(original)
88+
return False
89+
90+
if __name__ == "__main__":
91+
parser = argparse.ArgumentParser(description='Fix UDF function for Snowflake compatibility')
92+
parser.add_argument('udf_path', help='Path to UDF directory containing function.py')
93+
94+
args = parser.parse_args()
95+
fix_udf_function(args.udf_path)

scripts/external_api_integration.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
-- Step 1: Create the network rule for API Gateway access
22
USE ROLE ACCOUNTADMIN;
3-
USE DATABASE CO2_DB_PROD;
3+
USE DATABASE CO2_DB_DEV;
44
USE SCHEMA EXTERNAL;
55
CREATE OR REPLACE NETWORK RULE co2_lambda_api_rule
66
MODE = 'EGRESS'

scripts/snowflake_deployer.py

Lines changed: 99 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,82 @@ def deploy_component(profile_name, component_path, component_name, component_typ
180180
code_dir = os.path.join(component_path, subdirs[0])
181181
logger.info(f"Using actual code directory: {code_dir}")
182182

183+
# Check for UDF function signature to automatically fix issues
184+
function_file = os.path.join(code_dir, "function.py")
185+
has_session_param = False
186+
187+
if os.path.exists(function_file):
188+
try:
189+
with open(function_file, 'r') as f:
190+
content = f.read()
191+
192+
# Check if we need to fix the function signature for UDFs with session parameters
193+
if component_type.lower() == "udf" and "def main(session, input_data" in content:
194+
logger.info("Detected UDF with session parameter - applying automatic fix")
195+
has_session_param = True
196+
197+
# Check if we need to automatically modify UDF with wrapper function
198+
if "def main_with_session(" not in content:
199+
# Create a backup of the original file
200+
backup_file = function_file + ".bak"
201+
with open(backup_file, 'w') as f:
202+
f.write(content)
203+
204+
# Modify the content to include a wrapper
205+
modified_content = content.replace(
206+
"def main(session, input_data",
207+
"""# Original function
208+
def main_with_session(session, input_data
209+
210+
# Wrapper function that Snowflake calls directly
211+
def main(input_data"""
212+
)
213+
214+
# Add the wrapper implementation at the end
215+
indent = " " # Default indentation
216+
lines = modified_content.split("\n")
217+
# Find where to insert wrapper code
218+
insert_pos = len(lines) # Default to end of file
219+
for i in range(len(lines)):
220+
if lines[i].strip() == "def main(input_data":
221+
# Find indentation level and next non-empty line
222+
for j in range(i+1, len(lines)):
223+
if lines[j].strip():
224+
indent = lines[j].split(lines[j].lstrip())[0]
225+
break
226+
227+
# Add wrapper implementation
228+
wrapper_code = [
229+
f"{indent}# Get session from Snowflake context",
230+
f"{indent}from snowflake.snowpark.context import get_active_session",
231+
f"{indent}session = get_active_session()",
232+
f"{indent}# Call the original function with session",
233+
f"{indent}return main_with_session(session, input_data)"
234+
]
235+
236+
# Find the position to insert the wrapper code
237+
for i in range(len(lines)-1, -1, -1):
238+
if "def main(input_data" in lines[i]:
239+
# Find where the function body ends
240+
func_indent = lines[i+1].split(lines[i+1].lstrip())[0]
241+
for j in range(i+1, len(lines)):
242+
if j == len(lines)-1 or (lines[j] and not lines[j].startswith(func_indent)):
243+
insert_pos = j
244+
break
245+
246+
# Insert wrapper code at appropriate position
247+
for code_line in wrapper_code:
248+
lines.insert(insert_pos, code_line)
249+
insert_pos += 1
250+
251+
# Write modified content back
252+
with open(function_file, 'w') as f:
253+
f.write("\n".join(lines))
254+
255+
logger.info(f"Modified UDF function to handle session parameter")
256+
except Exception as e:
257+
logger.warning(f"Could not check/fix function signature: {str(e)}")
258+
183259
# Log directory contents
184260
logger.info(f"Component directory structure:")
185261
for root, dirs, files in os.walk(component_path):
@@ -217,16 +293,29 @@ def deploy_component(profile_name, component_path, component_name, component_typ
217293
import_path = f"@{stage_name}/{component_name.replace(' ', '_')}/{zip_filename}"
218294

219295
if component_type.lower() == "udf":
220-
# For Snowpark UDFs that use session parameter
221-
sql = f"""
222-
CREATE OR REPLACE FUNCTION {component_name.replace(' ', '_')}(input_data VARIANT)
223-
RETURNS VARIANT
224-
LANGUAGE PYTHON
225-
RUNTIME_VERSION=3.8
226-
PACKAGES = ('snowflake-snowpark-python')
227-
IMPORTS = ('{import_path}')
228-
HANDLER = 'function.main'
229-
"""
296+
# For Snowpark UDFs - different SQL based on parameter signature
297+
if has_session_param:
298+
# For UDFs with session parameter (Snowpark style)
299+
sql = f"""
300+
CREATE OR REPLACE FUNCTION {component_name.replace(' ', '_')}(input_data VARIANT)
301+
RETURNS VARIANT
302+
LANGUAGE PYTHON
303+
RUNTIME_VERSION=3.8
304+
PACKAGES = ('snowflake-snowpark-python')
305+
HANDLER = 'function.main'
306+
IMPORTS = ('{import_path}')
307+
"""
308+
else:
309+
# For basic UDFs without session parameter
310+
sql = f"""
311+
CREATE OR REPLACE FUNCTION {component_name.replace(' ', '_')}(input_data VARIANT)
312+
RETURNS VARIANT
313+
LANGUAGE PYTHON
314+
RUNTIME_VERSION=3.8
315+
PACKAGES = ('snowflake-snowpark-python')
316+
IMPORTS = ('{import_path}')
317+
HANDLER = 'function.main'
318+
"""
230319
else: # procedure
231320
sql = f"""
232321
CREATE OR REPLACE PROCEDURE {component_name.replace(' ', '_')}()

0 commit comments

Comments
 (0)