diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/_run_submitter_client.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/_run_submitter_client.py index 0c6010e41c99..ffd169119c62 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/_run_submitter_client.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/_run_submitter_client.py @@ -44,7 +44,6 @@ def run( # input. Update the inputs so that each entry is a dictionary with a data key # that contains the original input data. inputs = [{"data": input_data} for input_data in data.to_dict(orient="records")] - # Pass the correct previous run to the evaluator run: Optional[BatchClientRun] = kwargs.pop("run", None) if run: @@ -73,6 +72,7 @@ def run( return run_future def get_details(self, client_run: BatchClientRun, all_results: bool = False) -> pd.DataFrame: + run = self._get_run(client_run) data: Dict[str, List[Any]] = defaultdict(list) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py index 9dee21d31eb4..fea768740277 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py @@ -7,6 +7,8 @@ import logging import os import re +import tempfile +import json from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypedDict, Union, cast from openai import OpenAI, AzureOpenAI @@ -611,6 +613,18 @@ def _apply_target_to_data( category=ErrorCategory.FAILED_EXECUTION, blame=ErrorBlame.USER_ERROR, ) + + # Log a warning if some rows failed + failed_lines = run_summary.get("failed_lines", 0) + completed_lines = run_summary["completed_lines"] + total_lines = failed_lines + completed_lines + + if failed_lines > 0: + LOGGER.warning( + f"Target function completed {completed_lines} out of {total_lines} rows. " + f"{failed_lines} rows failed and will be filled with NaN values." + ) + # Remove input and output prefix generated_columns = { col[len(Prefixes.OUTPUTS) :] for col in target_output.columns if col.startswith(Prefixes.OUTPUTS) @@ -618,6 +632,13 @@ def _apply_target_to_data( # Sort output by line numbers target_output.set_index(f"inputs.{LINE_NUMBER}", inplace=True) target_output.sort_index(inplace=True) + + initial_data_with_line_numbers = initial_data.copy() + initial_data_with_line_numbers[LINE_NUMBER] = range(len(initial_data)) + + complete_index = initial_data_with_line_numbers[LINE_NUMBER] + target_output = target_output.reindex(complete_index) + target_output.reset_index(inplace=True, drop=False) # target_output contains only input columns, taken by function, # so we need to concatenate it to the input data frame. @@ -626,8 +647,8 @@ def _apply_target_to_data( # Rename outputs columns to __outputs rename_dict = {col: col.replace(Prefixes.OUTPUTS, Prefixes.TSG_OUTPUTS) for col in target_output.columns} target_output.rename(columns=rename_dict, inplace=True) - # Concatenate output to input - target_output = pd.concat([target_output, initial_data], axis=1) + # Concatenate output to input - now both dataframes have the same number of rows + target_output = pd.concat([initial_data, target_output], axis=1) return target_output, generated_columns, run @@ -645,7 +666,7 @@ def _process_column_mappings( processed_config: Dict[str, Dict[str, str]] = {} - expected_references = re.compile(r"^\$\{(target|data)\.[a-zA-Z0-9_]+\}$") + expected_references = re.compile(r"^\$\{(target|data)\.([a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_]+)*)\}$") if column_mapping: for evaluator, mapping_config in column_mapping.items(): @@ -1013,17 +1034,50 @@ def _preprocess_data( target, batch_run_data, batch_run_client, input_data_df, evaluation_name, **kwargs ) - for evaluator_name, mapping in column_mapping.items(): - mapped_to_values = set(mapping.values()) - for col in target_generated_columns: - # If user defined mapping differently, do not change it. - # If it was mapped to target, we have already changed it - # in _process_column_mappings - run_output = f"${{run.outputs.{col}}}" - # We will add our mapping only if - # customer did not mapped target output. - if col not in mapping and run_output not in mapped_to_values: - column_mapping[evaluator_name][col] = run_output # pylint: disable=unnecessary-dict-index-lookup + # IMPORTANT FIX: For ProxyClient, create a temporary file with the complete dataframe + # This ensures that evaluators get all rows (including failed ones with NaN values) + if isinstance(batch_run_client, ProxyClient): + # Create a temporary JSONL file with the complete dataframe + temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) + try: + for _, row in input_data_df.iterrows(): + row_dict = row.to_dict() + temp_file.write(json.dumps(row_dict) + "\n") + temp_file.close() + batch_run_data = temp_file.name + + # Update column mappings to use data references instead of run outputs + for evaluator_name, mapping in column_mapping.items(): + mapped_to_values = set(mapping.values()) + for col in target_generated_columns: + # Use data reference instead of run output to ensure we get all rows + target_reference = f"${{data.{Prefixes.TSG_OUTPUTS}{col}}}" + + # We will add our mapping only if customer did not map target output. + if col not in mapping and target_reference not in mapped_to_values: + column_mapping[evaluator_name][col] = target_reference + + # Don't pass the target_run since we're now using the complete dataframe + target_run = None + + except Exception as e: + # Clean up the temp file if something goes wrong + if os.path.exists(temp_file.name): + os.unlink(temp_file.name) + raise e + else: + # For DataFrame-based clients, update batch_run_data to use the updated input_data_df + batch_run_data = input_data_df + + # Update column mappings for DataFrame clients + for evaluator_name, mapping in column_mapping.items(): + mapped_to_values = set(mapping.values()) + for col in target_generated_columns: + target_reference = f"${{data.{Prefixes.TSG_OUTPUTS}{col}}}" + + # We will add our mapping only if customer did not map target output. + if col not in mapping and target_reference not in mapped_to_values: + column_mapping[evaluator_name][col] = target_reference # After we have generated all columns, we can check if we have everything we need for evaluators. _validate_columns_for_evaluators(input_data_df, evaluators, target, target_generated_columns, column_mapping) @@ -1062,30 +1116,50 @@ def _run_callable_evaluators( batch_run_data = validated_data["batch_run_data"] column_mapping = validated_data["column_mapping"] evaluators = validated_data["evaluators"] - with EvalRunContext(batch_run_client): - runs = { - evaluator_name: batch_run_client.run( - flow=evaluator, - data=batch_run_data, - run=target_run, - evaluator_name=evaluator_name, - column_mapping=column_mapping.get(evaluator_name, column_mapping.get("default", None)), - stream=True, - name=kwargs.get("_run_name"), - ) - for evaluator_name, evaluator in evaluators.items() - } - # get_details needs to be called within EvalRunContext scope in order to have user agent populated - per_evaluator_results: Dict[str, __EvaluatorInfo] = { - evaluator_name: { - "result": batch_run_client.get_details(run, all_results=True), - "metrics": batch_run_client.get_metrics(run), - "run_summary": batch_run_client.get_run_summary(run), + # Clean up temporary file after evaluation if it was created + temp_file_to_cleanup = None + if ( + isinstance(batch_run_client, ProxyClient) + and isinstance(batch_run_data, str) + and batch_run_data.endswith(".jsonl") + ): + # Check if it's a temporary file (contains temp directory path) + if tempfile.gettempdir() in batch_run_data: + temp_file_to_cleanup = batch_run_data + + try: + with EvalRunContext(batch_run_client): + runs = { + evaluator_name: batch_run_client.run( + flow=evaluator, + data=batch_run_data, + # Don't pass target_run when using complete dataframe + run=target_run, + evaluator_name=evaluator_name, + column_mapping=column_mapping.get(evaluator_name, column_mapping.get("default", None)), + stream=True, + name=kwargs.get("_run_name"), + ) + for evaluator_name, evaluator in evaluators.items() } - for evaluator_name, run in runs.items() - } + # get_details needs to be called within EvalRunContext scope in order to have user agent populated + per_evaluator_results: Dict[str, __EvaluatorInfo] = { + evaluator_name: { + "result": batch_run_client.get_details(run, all_results=True), + "metrics": batch_run_client.get_metrics(run), + "run_summary": batch_run_client.get_run_summary(run), + } + for evaluator_name, run in runs.items() + } + finally: + # Clean up temporary file if it was created + if temp_file_to_cleanup and os.path.exists(temp_file_to_cleanup): + try: + os.unlink(temp_file_to_cleanup) + except Exception as e: + LOGGER.warning(f"Failed to clean up temporary file {temp_file_to_cleanup}: {e}") # Concatenate all results evaluators_result_df = pd.DataFrame() evaluators_metric = {} diff --git a/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_evaluate.py b/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_evaluate.py index 8b744bf3e49d..192df7b48e7d 100644 --- a/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_evaluate.py +++ b/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_evaluate.py @@ -209,12 +209,12 @@ def test_evaluate_with_target(self, questions_file, run_from_temp_dir): None, {"default": {}}, {"default": {}, "question_ev": {}}, - {"default": {"column_mapping": {"query": "${target.query}"}}}, + {"default": {"column_mapping": {"query": "${data.__outputs.query}"}}}, {"default": {"column_mapping": {"query": "${data.query}"}}}, {"default": {}, "question_ev": {"column_mapping": {"query": "${data.query}"}}}, - {"default": {}, "question_ev": {"column_mapping": {"query": "${target.query}"}}}, - {"default": {}, "question_ev": {"column_mapping": {"another_question": "${target.query}"}}}, - {"default": {"column_mapping": {"another_question": "${target.query}"}}}, + {"default": {}, "question_ev": {"column_mapping": {"query": "${data.__outputs.query}"}}}, + {"default": {}, "question_ev": {"column_mapping": {"another_question": "${data.__outputs.query}"}}}, + {"default": {"column_mapping": {"another_question": "${data.__outputs.query}"}}}, ], ) def test_evaluate_another_questions(self, questions_file, evaluation_config, run_from_temp_dir): @@ -241,7 +241,7 @@ def test_evaluate_another_questions(self, questions_file, evaluation_config, run if evaluation_config: config = evaluation_config.get("question_ev", evaluation_config.get("default", None)) mapping = config.get("column_mapping", config) - if mapping and ("another_question" in mapping or mapping["query"] == "${data.query}"): + if mapping and ("another_question" in mapping or mapping.get("query") == "${data.query}"): query = "inputs.query" expected = list(row_result_df[query].str.len()) assert expected == list(row_result_df["outputs.question_ev.length"]) @@ -259,7 +259,7 @@ def test_evaluate_another_questions(self, questions_file, evaluation_config, run }, "answer": { "column_mapping": { - "response": "${target.response}", + "response": "${data.__outputs.response}", } }, } @@ -268,7 +268,7 @@ def test_evaluate_another_questions(self, questions_file, evaluation_config, run { "default": { "column_mapping": { - "response": "${target.response}", + "response": "${data.__outputs.response}", "ground_truth": "${data.ground_truth}", } }, diff --git a/sdk/evaluation/azure-ai-evaluation/tests/unittests/test_evaluate_mismatch.py b/sdk/evaluation/azure-ai-evaluation/tests/unittests/test_evaluate_mismatch.py new file mode 100644 index 000000000000..f1c6ae16845e --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/tests/unittests/test_evaluate_mismatch.py @@ -0,0 +1,488 @@ +import json +import math +import os +import pathlib +import tempfile +import pytest +import pandas as pd +from unittest.mock import Mock, patch, mock_open, MagicMock +from pandas.testing import assert_frame_equal + +from azure.ai.evaluation import evaluate, F1ScoreEvaluator +from azure.ai.evaluation._evaluate._evaluate import ( + _preprocess_data, + _run_callable_evaluators, + __ValidatedData, # Keep double underscore +) +from azure.ai.evaluation._evaluate._batch_run import ProxyClient, CodeClient, RunSubmitterClient +from azure.ai.evaluation._constants import Prefixes +from azure.ai.evaluation._exceptions import EvaluationException + +# Create alias to avoid name mangling issues in class scope +ValidatedData = __ValidatedData + + +def _get_file(name): + """Get the file from the unittest data folder.""" + data_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data") + return os.path.join(data_path, name) + + +def _target_with_failures(query): + """A target function that fails for certain inputs.""" + if "LV-426" in query: + raise Exception("Target failure for LV-426") + if "central heating" in query: + raise Exception("Target failure for central heating") + return {"response": f"Response to: {query}"} + + +def _successful_target(query): + """A target function that always succeeds.""" + return {"response": f"Response to: {query}"} + + +def _simple_evaluator(query, response): + """A simple evaluator for testing.""" + return {"score": len(response) if response else 0} + + +@pytest.fixture +def sample_questions_file(): + """Create a temporary test file with sample questions.""" + test_data = [ + {"query": "How long is flight from Earth to LV-426?"}, + {"query": "Why there is no central heating on the street?"}, + {"query": "Why these questions are so strange?"}, + {"query": "What is the weather like today?"}, + ] + + temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) + for item in test_data: + temp_file.write(json.dumps(item) + "\n") + temp_file.close() + + yield temp_file.name + + # Cleanup + if os.path.exists(temp_file.name): + os.unlink(temp_file.name) + + +@pytest.fixture +def sample_dataframe_with_target_outputs(): + """Create a sample dataframe with target outputs including failures.""" + return pd.DataFrame( + { + "query": [ + "How long is flight from Earth to LV-426?", + "Why there is no central heating on the street?", + "Why these questions are so strange?", + "What is the weather like today?", + ], + "__outputs.response": [ + None, # Failed + None, # Failed + "Response to: Why these questions are so strange?", # Success + "Response to: What is the weather like today?", # Success + ], + "line_number": [0, 1, 2, 3], + } + ) + + +@pytest.mark.unittest +class TestTargetFailureHandling: + """Test cases for target failure handling functionality.""" + + @patch("azure.ai.evaluation._evaluate._evaluate._apply_target_to_data") + @patch("azure.ai.evaluation._evaluate._evaluate._validate_and_load_data") + def test_preprocess_data_creates_temp_file_for_proxy_client_with_target_failures( + self, mock_load_data, mock_apply_target, sample_dataframe_with_target_outputs + ): + """Test that _preprocess_data creates a temporary file for ProxyClient when target has failures.""" + # Setup mocks + mock_load_data.return_value = pd.DataFrame({"query": ["test"]}) + mock_apply_target.return_value = (sample_dataframe_with_target_outputs, {"response"}, Mock()) + + # Test data + evaluators_and_graders = {"test_eval": _simple_evaluator} + + with patch("tempfile.NamedTemporaryFile") as mock_temp_file: + mock_file = Mock() + mock_file.name = "/tmp/test_temp_file.jsonl" + mock_file.__enter__ = Mock(return_value=mock_file) + mock_file.__exit__ = Mock(return_value=None) + mock_temp_file.return_value = mock_file + + with patch("json.dumps") as mock_json_dumps: + mock_json_dumps.return_value = '{"test": "data"}' + + result = _preprocess_data( + data="/test/path.jsonl", + evaluators_and_graders=evaluators_and_graders, + target=_target_with_failures, + _use_pf_client=True, + ) + + # Verify temp file was created + mock_temp_file.assert_called_once() + + # Verify batch_run_data points to temp file + assert result["batch_run_data"] == "/tmp/test_temp_file.jsonl" + + # Verify target_run is None (we don't use previous run) + assert result["target_run"] is None + + # Verify column mapping uses data references instead of run outputs + assert "response" in result["column_mapping"]["default"] + assert result["column_mapping"]["default"]["response"] == "${data.__outputs.response}" + + @patch("azure.ai.evaluation._evaluate._evaluate._apply_target_to_data") + @patch("azure.ai.evaluation._evaluate._evaluate._validate_and_load_data") + def test_preprocess_data_uses_dataframe_for_non_proxy_clients_with_target_failures( + self, mock_load_data, mock_apply_target, sample_dataframe_with_target_outputs + ): + """Test that _preprocess_data uses dataframe for non-ProxyClient when target has failures.""" + # Setup mocks + mock_load_data.return_value = pd.DataFrame({"query": ["test"]}) + mock_apply_target.return_value = (sample_dataframe_with_target_outputs, {"response"}, Mock()) + + # Test data + evaluators_and_graders = {"test_eval": _simple_evaluator} + + result = _preprocess_data( + data="/test/path.jsonl", + evaluators_and_graders=evaluators_and_graders, + target=_target_with_failures, + _use_run_submitter_client=True, + ) + + # Verify batch_run_data is the dataframe + assert isinstance(result["batch_run_data"], pd.DataFrame) + assert_frame_equal(result["batch_run_data"], sample_dataframe_with_target_outputs) + + # Verify column mapping uses data references + assert "response" in result["column_mapping"]["default"] + assert result["column_mapping"]["default"]["response"] == "${data.__outputs.response}" + + @patch("azure.ai.evaluation._evaluate._evaluate.json.dumps") + @patch("azure.ai.evaluation._evaluate._evaluate.pd.isna") + def test_temp_file_creation_handles_nan_values( + self, mock_isna, mock_json_dumps, sample_dataframe_with_target_outputs + ): + """Test that NaN values are properly converted to None in temp file creation.""" + # Setup mocks - simulate NaN detection + mock_isna.side_effect = lambda x: x is None + mock_json_dumps.return_value = '{"test": "data"}' + + with patch("tempfile.NamedTemporaryFile") as mock_temp_file: + mock_file = Mock() + mock_file.name = "/tmp/test.jsonl" + mock_file.write = Mock() + mock_file.close = Mock() + mock_temp_file.return_value = mock_file + + with patch("azure.ai.evaluation._evaluate._evaluate._apply_target_to_data") as mock_apply_target: + with patch("azure.ai.evaluation._evaluate._evaluate._validate_and_load_data") as mock_load_data: + mock_load_data.return_value = pd.DataFrame({"query": ["test"]}) + mock_apply_target.return_value = (sample_dataframe_with_target_outputs, {"response"}, Mock()) + + _preprocess_data( + data="/test/path.jsonl", + evaluators_and_graders={"test_eval": _simple_evaluator}, + target=_target_with_failures, + _use_pf_client=True, + ) + + # Verify json.dumps was called (temp file creation happened) + assert mock_json_dumps.call_count > 0 + + def test_temp_file_cleanup_on_exception(self): + """Test that temporary files are cleaned up when exceptions occur.""" + with patch("tempfile.NamedTemporaryFile") as mock_temp_file: + mock_file = Mock() + mock_file.name = "/tmp/test_temp_file.jsonl" + mock_temp_file.return_value = mock_file + + with patch("os.path.exists") as mock_exists: + with patch("os.unlink") as mock_unlink: + mock_exists.return_value = True + + with patch("azure.ai.evaluation._evaluate._evaluate._apply_target_to_data") as mock_apply_target: + with patch("azure.ai.evaluation._evaluate._evaluate._validate_and_load_data") as mock_load_data: + mock_load_data.return_value = pd.DataFrame({"query": ["test"]}) + mock_apply_target.return_value = ( + pd.DataFrame({"query": ["test"], "__outputs.response": ["response"]}), + {"response"}, + Mock(), + ) + + # Mock json.dumps to raise an exception + with patch("json.dumps", side_effect=Exception("JSON error")): + with pytest.raises(Exception): + _preprocess_data( + data="/test/path.jsonl", + evaluators_and_graders={"test_eval": _simple_evaluator}, + target=_target_with_failures, + _use_pf_client=True, + ) + + # Verify cleanup was attempted + mock_unlink.assert_called_once_with("/tmp/test_temp_file.jsonl") + + @patch("azure.ai.evaluation._evaluate._evaluate.EvalRunContext") + def test_run_callable_evaluators_temp_file_cleanup(self, mock_eval_context): + """Test that _run_callable_evaluators cleans up temporary files.""" + # Create mock validated data with temp file + temp_file_path = "/tmp/test_eval_temp.jsonl" + validated_data = ValidatedData( + evaluators={"test_eval": _simple_evaluator}, + graders={}, + input_data_df=pd.DataFrame({"query": ["test"], "__outputs.response": ["response"]}), + column_mapping={"default": {"response": "${data.__outputs.response}"}}, + target_run=None, + batch_run_client=Mock(spec=ProxyClient), + batch_run_data=temp_file_path, + ) + + # Mock the batch client run methods + mock_run = Mock() + validated_data["batch_run_client"].run.return_value = mock_run + validated_data["batch_run_client"].get_details.return_value = pd.DataFrame({"outputs.test_eval.score": [10]}) + validated_data["batch_run_client"].get_metrics.return_value = {} + validated_data["batch_run_client"].get_run_summary.return_value = {"failed_lines": 0, "status": "Completed"} + + with patch("tempfile.gettempdir", return_value="/tmp"): + with patch("os.path.exists") as mock_exists: + with patch("os.unlink") as mock_unlink: + mock_exists.return_value = True + + # Run the function + _run_callable_evaluators(validated_data) + + # Verify cleanup was called + mock_unlink.assert_called_once_with(temp_file_path) + + @patch("azure.ai.evaluation._evaluate._evaluate.EvalRunContext") + def test_run_callable_evaluators_no_cleanup_for_non_temp_files(self, mock_eval_context): + """Test that _run_callable_evaluators doesn't clean up non-temp files.""" + # Create mock validated data with regular file (not in temp directory) + regular_file_path = "/data/test_eval.jsonl" + validated_data = ValidatedData( + evaluators={"test_eval": _simple_evaluator}, + graders={}, + input_data_df=pd.DataFrame({"query": ["test"], "__outputs.response": ["response"]}), + column_mapping={"default": {"response": "${data.__outputs.response}"}}, + target_run=None, + batch_run_client=Mock(spec=ProxyClient), + batch_run_data=regular_file_path, + ) + + # Mock the batch client run methods + mock_run = Mock() + validated_data["batch_run_client"].run.return_value = mock_run + validated_data["batch_run_client"].get_details.return_value = pd.DataFrame({"outputs.test_eval.score": [10]}) + validated_data["batch_run_client"].get_metrics.return_value = {} + validated_data["batch_run_client"].get_run_summary.return_value = {"failed_lines": 0, "status": "Completed"} + + with patch("tempfile.gettempdir", return_value="/tmp"): + with patch("os.unlink") as mock_unlink: + # Run the function + _run_callable_evaluators(validated_data) + + # Verify cleanup was NOT called for non-temp file + mock_unlink.assert_not_called() + + def test_column_mapping_uses_data_reference_for_proxy_client_with_target(self): + """Test that column mapping uses ${data.__outputs.column} for ProxyClient with target failures.""" + with patch("azure.ai.evaluation._evaluate._evaluate._apply_target_to_data") as mock_apply_target: + with patch("azure.ai.evaluation._evaluate._evaluate._validate_and_load_data") as mock_load_data: + mock_load_data.return_value = pd.DataFrame({"query": ["test"]}) + mock_apply_target.return_value = ( + pd.DataFrame({"query": ["test"], "__outputs.response": ["response"]}), + {"response"}, + Mock(), + ) + + with patch("tempfile.NamedTemporaryFile") as mock_temp_file: + mock_file = Mock() + mock_file.name = "/tmp/test.jsonl" + mock_file.close = Mock() + mock_temp_file.return_value = mock_file + + with patch("json.dumps"): + result = _preprocess_data( + data="/test/path.jsonl", + evaluators_and_graders={"test_eval": _simple_evaluator}, + target=_target_with_failures, + _use_pf_client=True, + ) + + # Verify column mapping uses data reference + assert result["column_mapping"]["default"]["response"] == "${data.__outputs.response}" + + def test_column_mapping_uses_data_reference_for_dataframe_clients_with_target(self): + """Test that column mapping uses ${data.__outputs.column} for DataFrame clients with target.""" + with patch("azure.ai.evaluation._evaluate._evaluate._apply_target_to_data") as mock_apply_target: + with patch("azure.ai.evaluation._evaluate._evaluate._validate_and_load_data") as mock_load_data: + mock_load_data.return_value = pd.DataFrame({"query": ["test"]}) + mock_apply_target.return_value = ( + pd.DataFrame({"query": ["test"], "__outputs.response": ["response"]}), + {"response"}, + Mock(), + ) + + result = _preprocess_data( + data="/test/path.jsonl", + evaluators_and_graders={"test_eval": _simple_evaluator}, + target=_target_with_failures, + _use_run_submitter_client=True, + ) + + # Verify column mapping uses data reference + assert result["column_mapping"]["default"]["response"] == "${data.__outputs.response}" + + @patch("azure.ai.evaluation._evaluate._evaluate.EvalRunContext") + def test_run_callable_evaluators_doesnt_pass_target_run_when_using_complete_dataframe(self, mock_eval_context): + """Test that target_run is not passed when using complete dataframe with ProxyClient.""" + validated_data = ValidatedData( + evaluators={"test_eval": _simple_evaluator}, + graders={}, + input_data_df=pd.DataFrame({"query": ["test"], "__outputs.response": ["response"]}), + column_mapping={"default": {"response": "${data.__outputs.response}"}}, + target_run=Mock(), # This should not be passed to run() + batch_run_client=Mock(spec=ProxyClient), + batch_run_data="/tmp/test_temp.jsonl", + ) + + # Mock the batch client run methods + mock_run = Mock() + validated_data["batch_run_client"].run.return_value = mock_run + validated_data["batch_run_client"].get_details.return_value = pd.DataFrame({"outputs.test_eval.score": [10]}) + validated_data["batch_run_client"].get_metrics.return_value = {} + validated_data["batch_run_client"].get_run_summary.return_value = {"failed_lines": 0, "status": "Completed"} + + with patch("tempfile.gettempdir", return_value="/tmp"): + with patch("os.path.exists", return_value=True): + with patch("os.unlink"): + _run_callable_evaluators(validated_data) + + # Verify run was called with target_run (the original target_run should still be passed) + validated_data["batch_run_client"].run.assert_called_once() + call_args = validated_data["batch_run_client"].run.call_args + assert "run" in call_args[1] # target_run should be passed in kwargs + + @patch("azure.ai.evaluation._evaluate._evaluate.LOGGER") + def test_temp_file_cleanup_warning_on_failure(self, mock_logger): + """Test that warnings are logged when temp file cleanup fails.""" + validated_data = ValidatedData( + evaluators={"test_eval": _simple_evaluator}, + graders={}, + input_data_df=pd.DataFrame({"query": ["test"], "__outputs.response": ["response"]}), + column_mapping={"default": {"response": "${data.__outputs.response}"}}, + target_run=None, + batch_run_client=Mock(spec=ProxyClient), + batch_run_data="/tmp/test_temp.jsonl", + ) + + # Mock the batch client run methods + mock_run = Mock() + validated_data["batch_run_client"].run.return_value = mock_run + validated_data["batch_run_client"].get_details.return_value = pd.DataFrame({"outputs.test_eval.score": [10]}) + validated_data["batch_run_client"].get_metrics.return_value = {} + validated_data["batch_run_client"].get_run_summary.return_value = {"failed_lines": 0, "status": "Completed"} + + with patch("tempfile.gettempdir", return_value="/tmp"): + with patch("os.path.exists", return_value=True): + with patch("os.unlink", side_effect=Exception("Cleanup failed")): + with patch("azure.ai.evaluation._evaluate._evaluate.EvalRunContext"): + _run_callable_evaluators(validated_data) + + # Verify warning was logged + mock_logger.warning.assert_called_once() + warning_call = mock_logger.warning.call_args[0][0] + assert "Failed to clean up temporary file" in warning_call + assert "/tmp/test_temp.jsonl" in warning_call + + @patch("azure.ai.evaluation._evaluate._evaluate._validate_columns_for_evaluators") + @patch("azure.ai.evaluation._evaluate._evaluate._apply_target_to_data") + @patch("azure.ai.evaluation._evaluate._evaluate._validate_and_load_data") + def test_preprocess_data_no_temp_file_without_target( + self, mock_load_data, mock_apply_target, mock_validate_columns + ): + """Test that no temp file is created when there's no target function.""" + mock_load_data.return_value = pd.DataFrame({"query": ["test"], "response": ["response"]}) + + with patch("tempfile.NamedTemporaryFile") as mock_temp_file: + result = _preprocess_data( + data="/test/path.jsonl", + evaluators_and_graders={"test_eval": _simple_evaluator}, + target=None, # No target + _use_pf_client=True, + ) + + # Verify no temp file was created + mock_temp_file.assert_not_called() + + # Verify batch_run_data is still the original file path + assert result["batch_run_data"] == os.path.abspath("/test/path.jsonl") + + def test_temp_file_creation_path_with_proxy_client(self): + """Test that the temp file creation path is exercised for ProxyClient.""" + with patch("azure.ai.evaluation._evaluate._evaluate._apply_target_to_data") as mock_apply_target: + with patch("azure.ai.evaluation._evaluate._evaluate._validate_and_load_data") as mock_load_data: + mock_load_data.return_value = pd.DataFrame({"query": ["test"]}) + mock_apply_target.return_value = ( + pd.DataFrame({"query": ["test"], "__outputs.response": ["response"]}), + {"response"}, + Mock(), + ) + + with patch("tempfile.NamedTemporaryFile") as mock_temp_file: + mock_file = Mock() + mock_file.name = "/tmp/eval_temp.jsonl" + mock_file.close = Mock() + mock_temp_file.return_value = mock_file + + with patch("json.dumps", return_value='{"test": "data"}') as mock_json_dumps: + result = _preprocess_data( + data="/test/path.jsonl", + evaluators_and_graders={"test_eval": _simple_evaluator}, + target=_target_with_failures, + _use_pf_client=True, + ) + + # Verify that temp file was created and used + mock_temp_file.assert_called_once() + assert result["batch_run_data"] == "/tmp/eval_temp.jsonl" + assert result["target_run"] is None + + # Verify JSON serialization was called + assert mock_json_dumps.call_count > 0 + + def test_dataframe_client_preserves_all_rows_with_failures(self): + """Test that DataFrame-based clients preserve all rows including failures.""" + sample_df = pd.DataFrame( + { + "query": ["test1", "test2", "test3"], + "__outputs.response": [None, "response2", None], # Mixed success/failure + } + ) + + with patch("azure.ai.evaluation._evaluate._evaluate._apply_target_to_data") as mock_apply_target: + with patch("azure.ai.evaluation._evaluate._evaluate._validate_and_load_data") as mock_load_data: + mock_load_data.return_value = pd.DataFrame({"query": ["test1", "test2", "test3"]}) + mock_apply_target.return_value = (sample_df, {"response"}, Mock()) + + result = _preprocess_data( + data="/test/path.jsonl", + evaluators_and_graders={"test_eval": _simple_evaluator}, + target=_target_with_failures, + _use_run_submitter_client=True, + ) + + # Verify all rows are preserved in batch_run_data + assert isinstance(result["batch_run_data"], pd.DataFrame) + assert len(result["batch_run_data"]) == 3 + assert_frame_equal(result["batch_run_data"], sample_df)