Skip to content

Fill target errors with nans for evaluate #41919

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
4318329
Prepare evals SDK Release
May 28, 2025
192b980
Fix bug
May 28, 2025
758adb4
Fix for ADV_CONV for FDP projects
May 29, 2025
de09fd1
Update release date
May 29, 2025
ef60fe6
Merge branch 'main' into main
nagkumar91 May 29, 2025
8ca51d0
Merge branch 'Azure:main' into main
nagkumar91 May 30, 2025
98bfc3a
Merge branch 'Azure:main' into main
nagkumar91 Jun 2, 2025
a5f32e8
Merge branch 'Azure:main' into main
nagkumar91 Jun 9, 2025
5fd88b6
Merge branch 'Azure:main' into main
nagkumar91 Jun 10, 2025
51f2b44
Merge branch 'Azure:main' into main
nagkumar91 Jun 10, 2025
a5be8b5
Merge branch 'Azure:main' into main
nagkumar91 Jun 16, 2025
75965b7
Merge branch 'Azure:main' into main
nagkumar91 Jun 25, 2025
d0c5e53
Merge branch 'Azure:main' into main
nagkumar91 Jun 25, 2025
b790276
Merge branch 'Azure:main' into main
nagkumar91 Jun 26, 2025
d5ca243
Merge branch 'Azure:main' into main
nagkumar91 Jun 26, 2025
8d62e36
re-add pyrit to matrix
Jun 26, 2025
59a70f2
Change grader ids
Jun 26, 2025
4d146d7
Merge branch 'Azure:main' into main
nagkumar91 Jun 26, 2025
f7a4c83
Update unit test
Jun 27, 2025
79e3a40
replace all old grader IDs in tests
Jun 27, 2025
588cbec
Merge branch 'main' into main
nagkumar91 Jun 30, 2025
7514472
Update platform-matrix.json
nagkumar91 Jun 30, 2025
28b2513
Update test to ensure everything is mocked
Jul 1, 2025
8603e0e
tox/black fixes
Jul 1, 2025
895f226
Skip that test with issues
Jul 1, 2025
b4b2daf
Merge branch 'Azure:main' into main
nagkumar91 Jul 1, 2025
023f07f
update grader ID according to API View feedback
Jul 1, 2025
45b5f5d
Update test
Jul 2, 2025
1ccb4db
remove string check for grader ID
Jul 2, 2025
6fd9aa5
Merge branch 'Azure:main' into main
nagkumar91 Jul 2, 2025
f871855
Update changelog and officialy start freeze
Jul 2, 2025
59ac230
update the enum according to suggestions
Jul 2, 2025
794a2c4
update the changelog
Jul 2, 2025
b33363c
Finalize logic
Jul 2, 2025
464e2dd
Merge branch 'Azure:main' into main
nagkumar91 Jul 3, 2025
98dc816
Fill the dataset when target doesn't respond with all columns
Jul 4, 2025
3943344
Tox fixes
Jul 4, 2025
7504164
Send dataframe instead of previous run
Jul 7, 2025
9f3d5bc
tox fixes
Jul 7, 2025
610f97f
Add a test
Jul 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ def run(
evaluator_name: Optional[str] = None,
**kwargs: Any,
) -> BatchClientRun:
if not isinstance(data, pd.DataFrame):
raise ValueError("Data must be a pandas DataFrame")
# if not isinstance(data, pd.DataFrame):
# raise ValueError("Data must be a pandas DataFrame")

# The column mappings are indexed by data to indicate they come from the data
# input. Update the inputs so that each entry is a dictionary with a data key
# that contains the original input data.
inputs = [{"data": input_data} for input_data in data.to_dict(orient="records")]

# Pass the correct previous run to the evaluator
run: Optional[BatchClientRun] = kwargs.pop("run", None)
if run:
Expand Down Expand Up @@ -73,6 +72,7 @@ def run(
return run_future

def get_details(self, client_run: BatchClientRun, all_results: bool = False) -> pd.DataFrame:

run = self._get_run(client_run)

data: Dict[str, List[Any]] = defaultdict(list)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import logging
import os
import re
import tempfile
import json
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypedDict, Union, cast

from openai import OpenAI, AzureOpenAI
Expand Down Expand Up @@ -611,13 +613,32 @@ def _apply_target_to_data(
category=ErrorCategory.FAILED_EXECUTION,
blame=ErrorBlame.USER_ERROR,
)

# Log a warning if some rows failed
failed_lines = run_summary.get("failed_lines", 0)
completed_lines = run_summary["completed_lines"]
total_lines = failed_lines + completed_lines

if failed_lines > 0:
LOGGER.warning(
f"Target function completed {completed_lines} out of {total_lines} rows. "
f"{failed_lines} rows failed and will be filled with NaN values."
)

# Remove input and output prefix
generated_columns = {
col[len(Prefixes.OUTPUTS) :] for col in target_output.columns if col.startswith(Prefixes.OUTPUTS)
}
# Sort output by line numbers
target_output.set_index(f"inputs.{LINE_NUMBER}", inplace=True)
target_output.sort_index(inplace=True)

initial_data_with_line_numbers = initial_data.copy()
initial_data_with_line_numbers[LINE_NUMBER] = range(len(initial_data))

complete_index = initial_data_with_line_numbers[LINE_NUMBER]
target_output = target_output.reindex(complete_index)

target_output.reset_index(inplace=True, drop=False)
# target_output contains only input columns, taken by function,
# so we need to concatenate it to the input data frame.
Expand All @@ -626,8 +647,8 @@ def _apply_target_to_data(
# Rename outputs columns to __outputs
rename_dict = {col: col.replace(Prefixes.OUTPUTS, Prefixes.TSG_OUTPUTS) for col in target_output.columns}
target_output.rename(columns=rename_dict, inplace=True)
# Concatenate output to input
target_output = pd.concat([target_output, initial_data], axis=1)
# Concatenate output to input - now both dataframes have the same number of rows
target_output = pd.concat([initial_data, target_output], axis=1)

return target_output, generated_columns, run

Expand Down Expand Up @@ -1013,17 +1034,50 @@ def _preprocess_data(
target, batch_run_data, batch_run_client, input_data_df, evaluation_name, **kwargs
)

for evaluator_name, mapping in column_mapping.items():
mapped_to_values = set(mapping.values())
for col in target_generated_columns:
# If user defined mapping differently, do not change it.
# If it was mapped to target, we have already changed it
# in _process_column_mappings
run_output = f"${{run.outputs.{col}}}"
# We will add our mapping only if
# customer did not mapped target output.
if col not in mapping and run_output not in mapped_to_values:
column_mapping[evaluator_name][col] = run_output # pylint: disable=unnecessary-dict-index-lookup
# IMPORTANT FIX: For ProxyClient, create a temporary file with the complete dataframe
# This ensures that evaluators get all rows (including failed ones with NaN values)
if isinstance(batch_run_client, ProxyClient):
# Create a temporary JSONL file with the complete dataframe
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
try:
for _, row in input_data_df.iterrows():
row_dict = row.to_dict()
temp_file.write(json.dumps(row_dict) + "\n")
temp_file.close()
batch_run_data = temp_file.name

# Update column mappings to use data references instead of run outputs
for evaluator_name, mapping in column_mapping.items():
mapped_to_values = set(mapping.values())
for col in target_generated_columns:
# Use data reference instead of run output to ensure we get all rows
target_reference = f"${{data.{Prefixes.TSG_OUTPUTS}{col}}}"

# We will add our mapping only if customer did not map target output.
if col not in mapping and target_reference not in mapped_to_values:
column_mapping[evaluator_name][col] = target_reference

# Don't pass the target_run since we're now using the complete dataframe
target_run = None

except Exception as e:
# Clean up the temp file if something goes wrong
if os.path.exists(temp_file.name):
os.unlink(temp_file.name)
raise e
else:
# For DataFrame-based clients, update batch_run_data to use the updated input_data_df
batch_run_data = input_data_df

# Update column mappings for DataFrame clients
for evaluator_name, mapping in column_mapping.items():
mapped_to_values = set(mapping.values())
for col in target_generated_columns:
target_reference = f"${{data.{Prefixes.TSG_OUTPUTS}{col}}}"

# We will add our mapping only if customer did not map target output.
if col not in mapping and target_reference not in mapped_to_values:
column_mapping[evaluator_name][col] = target_reference

# After we have generated all columns, we can check if we have everything we need for evaluators.
_validate_columns_for_evaluators(input_data_df, evaluators, target, target_generated_columns, column_mapping)
Expand Down Expand Up @@ -1062,30 +1116,50 @@ def _run_callable_evaluators(
batch_run_data = validated_data["batch_run_data"]
column_mapping = validated_data["column_mapping"]
evaluators = validated_data["evaluators"]
with EvalRunContext(batch_run_client):
runs = {
evaluator_name: batch_run_client.run(
flow=evaluator,
data=batch_run_data,
run=target_run,
evaluator_name=evaluator_name,
column_mapping=column_mapping.get(evaluator_name, column_mapping.get("default", None)),
stream=True,
name=kwargs.get("_run_name"),
)
for evaluator_name, evaluator in evaluators.items()
}

# get_details needs to be called within EvalRunContext scope in order to have user agent populated
per_evaluator_results: Dict[str, __EvaluatorInfo] = {
evaluator_name: {
"result": batch_run_client.get_details(run, all_results=True),
"metrics": batch_run_client.get_metrics(run),
"run_summary": batch_run_client.get_run_summary(run),
# Clean up temporary file after evaluation if it was created
temp_file_to_cleanup = None
if (
isinstance(batch_run_client, ProxyClient)
and isinstance(batch_run_data, str)
and batch_run_data.endswith(".jsonl")
):
# Check if it's a temporary file (contains temp directory path)
if tempfile.gettempdir() in batch_run_data:
temp_file_to_cleanup = batch_run_data

try:
with EvalRunContext(batch_run_client):
runs = {
evaluator_name: batch_run_client.run(
flow=evaluator,
data=batch_run_data,
# Don't pass target_run when using complete dataframe
run=target_run,
evaluator_name=evaluator_name,
column_mapping=column_mapping.get(evaluator_name, column_mapping.get("default", None)),
stream=True,
name=kwargs.get("_run_name"),
)
for evaluator_name, evaluator in evaluators.items()
}
for evaluator_name, run in runs.items()
}

# get_details needs to be called within EvalRunContext scope in order to have user agent populated
per_evaluator_results: Dict[str, __EvaluatorInfo] = {
evaluator_name: {
"result": batch_run_client.get_details(run, all_results=True),
"metrics": batch_run_client.get_metrics(run),
"run_summary": batch_run_client.get_run_summary(run),
}
for evaluator_name, run in runs.items()
}
finally:
# Clean up temporary file if it was created
if temp_file_to_cleanup and os.path.exists(temp_file_to_cleanup):
try:
os.unlink(temp_file_to_cleanup)
except Exception as e:
LOGGER.warning(f"Failed to clean up temporary file {temp_file_to_cleanup}: {e}")
# Concatenate all results
evaluators_result_df = pd.DataFrame()
evaluators_metric = {}
Expand Down
Loading
Loading