7
7
import logging
8
8
import os
9
9
import re
10
+ import tempfile
11
+ import json
10
12
from typing import Any , Callable , Dict , List , Optional , Set , Tuple , TypedDict , Union , cast
11
13
12
14
from openai import OpenAI , AzureOpenAI
@@ -611,13 +613,32 @@ def _apply_target_to_data(
611
613
category = ErrorCategory .FAILED_EXECUTION ,
612
614
blame = ErrorBlame .USER_ERROR ,
613
615
)
616
+
617
+ # Log a warning if some rows failed
618
+ failed_lines = run_summary .get ("failed_lines" , 0 )
619
+ completed_lines = run_summary ["completed_lines" ]
620
+ total_lines = failed_lines + completed_lines
621
+
622
+ if failed_lines > 0 :
623
+ LOGGER .warning (
624
+ f"Target function completed { completed_lines } out of { total_lines } rows. "
625
+ f"{ failed_lines } rows failed and will be filled with NaN values."
626
+ )
627
+
614
628
# Remove input and output prefix
615
629
generated_columns = {
616
630
col [len (Prefixes .OUTPUTS ) :] for col in target_output .columns if col .startswith (Prefixes .OUTPUTS )
617
631
}
618
632
# Sort output by line numbers
619
633
target_output .set_index (f"inputs.{ LINE_NUMBER } " , inplace = True )
620
634
target_output .sort_index (inplace = True )
635
+
636
+ initial_data_with_line_numbers = initial_data .copy ()
637
+ initial_data_with_line_numbers [LINE_NUMBER ] = range (len (initial_data ))
638
+
639
+ complete_index = initial_data_with_line_numbers [LINE_NUMBER ]
640
+ target_output = target_output .reindex (complete_index )
641
+
621
642
target_output .reset_index (inplace = True , drop = False )
622
643
# target_output contains only input columns, taken by function,
623
644
# so we need to concatenate it to the input data frame.
@@ -626,8 +647,8 @@ def _apply_target_to_data(
626
647
# Rename outputs columns to __outputs
627
648
rename_dict = {col : col .replace (Prefixes .OUTPUTS , Prefixes .TSG_OUTPUTS ) for col in target_output .columns }
628
649
target_output .rename (columns = rename_dict , inplace = True )
629
- # Concatenate output to input
630
- target_output = pd .concat ([target_output , initial_data ], axis = 1 )
650
+ # Concatenate output to input - now both dataframes have the same number of rows
651
+ target_output = pd .concat ([initial_data , target_output ], axis = 1 )
631
652
632
653
return target_output , generated_columns , run
633
654
@@ -645,7 +666,7 @@ def _process_column_mappings(
645
666
646
667
processed_config : Dict [str , Dict [str , str ]] = {}
647
668
648
- expected_references = re .compile (r"^\$\{(target|data)\.[a-zA-Z0-9_]+\}$" )
669
+ expected_references = re .compile (r"^\$\{(target|data)\.( [a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_]+)*) \}$" )
649
670
650
671
if column_mapping :
651
672
for evaluator , mapping_config in column_mapping .items ():
@@ -1013,17 +1034,50 @@ def _preprocess_data(
1013
1034
target , batch_run_data , batch_run_client , input_data_df , evaluation_name , ** kwargs
1014
1035
)
1015
1036
1016
- for evaluator_name , mapping in column_mapping .items ():
1017
- mapped_to_values = set (mapping .values ())
1018
- for col in target_generated_columns :
1019
- # If user defined mapping differently, do not change it.
1020
- # If it was mapped to target, we have already changed it
1021
- # in _process_column_mappings
1022
- run_output = f"${{run.outputs.{ col } }}"
1023
- # We will add our mapping only if
1024
- # customer did not mapped target output.
1025
- if col not in mapping and run_output not in mapped_to_values :
1026
- column_mapping [evaluator_name ][col ] = run_output # pylint: disable=unnecessary-dict-index-lookup
1037
+ # IMPORTANT FIX: For ProxyClient, create a temporary file with the complete dataframe
1038
+ # This ensures that evaluators get all rows (including failed ones with NaN values)
1039
+ if isinstance (batch_run_client , ProxyClient ):
1040
+ # Create a temporary JSONL file with the complete dataframe
1041
+ temp_file = tempfile .NamedTemporaryFile (mode = "w" , suffix = ".jsonl" , delete = False )
1042
+ try :
1043
+ for _ , row in input_data_df .iterrows ():
1044
+ row_dict = row .to_dict ()
1045
+ temp_file .write (json .dumps (row_dict ) + "\n " )
1046
+ temp_file .close ()
1047
+ batch_run_data = temp_file .name
1048
+
1049
+ # Update column mappings to use data references instead of run outputs
1050
+ for evaluator_name , mapping in column_mapping .items ():
1051
+ mapped_to_values = set (mapping .values ())
1052
+ for col in target_generated_columns :
1053
+ # Use data reference instead of run output to ensure we get all rows
1054
+ target_reference = f"${{data.{ Prefixes .TSG_OUTPUTS } { col } }}"
1055
+
1056
+ # We will add our mapping only if customer did not map target output.
1057
+ if col not in mapping and target_reference not in mapped_to_values :
1058
+ column_mapping [evaluator_name ][col ] = target_reference
1059
+
1060
+ # Don't pass the target_run since we're now using the complete dataframe
1061
+ target_run = None
1062
+
1063
+ except Exception as e :
1064
+ # Clean up the temp file if something goes wrong
1065
+ if os .path .exists (temp_file .name ):
1066
+ os .unlink (temp_file .name )
1067
+ raise e
1068
+ else :
1069
+ # For DataFrame-based clients, update batch_run_data to use the updated input_data_df
1070
+ batch_run_data = input_data_df
1071
+
1072
+ # Update column mappings for DataFrame clients
1073
+ for evaluator_name , mapping in column_mapping .items ():
1074
+ mapped_to_values = set (mapping .values ())
1075
+ for col in target_generated_columns :
1076
+ target_reference = f"${{data.{ Prefixes .TSG_OUTPUTS } { col } }}"
1077
+
1078
+ # We will add our mapping only if customer did not map target output.
1079
+ if col not in mapping and target_reference not in mapped_to_values :
1080
+ column_mapping [evaluator_name ][col ] = target_reference
1027
1081
1028
1082
# After we have generated all columns, we can check if we have everything we need for evaluators.
1029
1083
_validate_columns_for_evaluators (input_data_df , evaluators , target , target_generated_columns , column_mapping )
@@ -1062,30 +1116,50 @@ def _run_callable_evaluators(
1062
1116
batch_run_data = validated_data ["batch_run_data" ]
1063
1117
column_mapping = validated_data ["column_mapping" ]
1064
1118
evaluators = validated_data ["evaluators" ]
1065
- with EvalRunContext (batch_run_client ):
1066
- runs = {
1067
- evaluator_name : batch_run_client .run (
1068
- flow = evaluator ,
1069
- data = batch_run_data ,
1070
- run = target_run ,
1071
- evaluator_name = evaluator_name ,
1072
- column_mapping = column_mapping .get (evaluator_name , column_mapping .get ("default" , None )),
1073
- stream = True ,
1074
- name = kwargs .get ("_run_name" ),
1075
- )
1076
- for evaluator_name , evaluator in evaluators .items ()
1077
- }
1078
1119
1079
- # get_details needs to be called within EvalRunContext scope in order to have user agent populated
1080
- per_evaluator_results : Dict [str , __EvaluatorInfo ] = {
1081
- evaluator_name : {
1082
- "result" : batch_run_client .get_details (run , all_results = True ),
1083
- "metrics" : batch_run_client .get_metrics (run ),
1084
- "run_summary" : batch_run_client .get_run_summary (run ),
1120
+ # Clean up temporary file after evaluation if it was created
1121
+ temp_file_to_cleanup = None
1122
+ if (
1123
+ isinstance (batch_run_client , ProxyClient )
1124
+ and isinstance (batch_run_data , str )
1125
+ and batch_run_data .endswith (".jsonl" )
1126
+ ):
1127
+ # Check if it's a temporary file (contains temp directory path)
1128
+ if tempfile .gettempdir () in batch_run_data :
1129
+ temp_file_to_cleanup = batch_run_data
1130
+
1131
+ try :
1132
+ with EvalRunContext (batch_run_client ):
1133
+ runs = {
1134
+ evaluator_name : batch_run_client .run (
1135
+ flow = evaluator ,
1136
+ data = batch_run_data ,
1137
+ # Don't pass target_run when using complete dataframe
1138
+ run = target_run ,
1139
+ evaluator_name = evaluator_name ,
1140
+ column_mapping = column_mapping .get (evaluator_name , column_mapping .get ("default" , None )),
1141
+ stream = True ,
1142
+ name = kwargs .get ("_run_name" ),
1143
+ )
1144
+ for evaluator_name , evaluator in evaluators .items ()
1085
1145
}
1086
- for evaluator_name , run in runs .items ()
1087
- }
1088
1146
1147
+ # get_details needs to be called within EvalRunContext scope in order to have user agent populated
1148
+ per_evaluator_results : Dict [str , __EvaluatorInfo ] = {
1149
+ evaluator_name : {
1150
+ "result" : batch_run_client .get_details (run , all_results = True ),
1151
+ "metrics" : batch_run_client .get_metrics (run ),
1152
+ "run_summary" : batch_run_client .get_run_summary (run ),
1153
+ }
1154
+ for evaluator_name , run in runs .items ()
1155
+ }
1156
+ finally :
1157
+ # Clean up temporary file if it was created
1158
+ if temp_file_to_cleanup and os .path .exists (temp_file_to_cleanup ):
1159
+ try :
1160
+ os .unlink (temp_file_to_cleanup )
1161
+ except Exception as e :
1162
+ LOGGER .warning (f"Failed to clean up temporary file { temp_file_to_cleanup } : { e } " )
1089
1163
# Concatenate all results
1090
1164
evaluators_result_df = pd .DataFrame ()
1091
1165
evaluators_metric = {}
0 commit comments