Skip to content

Add support for saving/loading evaluators #40983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# ---------------------------------------------------------
import os
import types
from typing import Optional, Type, Union
from typing import Optional, Type

from azure.ai.evaluation._legacy._adapters._constants import PF_FLOW_ENTRY_IN_TMP, PF_FLOW_META_LOAD_IN_SUBPROCESS
from azure.ai.evaluation._legacy._adapters.utils import ClientUserAgentUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def _apply_target_to_data(
:param data: The path to input jsonl or csv file.
:type data: Union[str, os.PathLike]
:param batch_client: The promptflow client to be used.
:type batch_client: PFClient
:type batch_client: BatchClient
:param initial_data: The data frame with the loaded data.
:type initial_data: pd.DataFrame
:param evaluation_name: The name of the evaluation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,16 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import functools
import inspect
import json
import logging
from typing import Callable, Dict, Literal, Optional, Union, cast
from typing import Callable, Dict, Literal

import pandas as pd
from azure.ai.evaluation._legacy._adapters._flows import FlexFlow as flex_flow
from azure.ai.evaluation._legacy._adapters._flows import AsyncPrompty as prompty_sdk
from azure.ai.evaluation._legacy._adapters._flows import Flow as dag_flow
from azure.ai.evaluation._legacy._adapters.client import PFClient
from typing_extensions import ParamSpec

from azure.ai.evaluation._model_configurations import AzureAIProject, EvaluationResult

from ..._user_agent import USER_AGENT
from .._utils import _trace_destination_from_project_scope

LOGGER = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,8 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

from importlib.util import find_spec
from typing import Final


_has_legacy = False
try:
from promptflow._constants import FlowType

_has_legacy = True
except ImportError:
pass

HAS_LEGACY_SDK: Final[bool] = _has_legacy
MISSING_LEGACY_SDK: Final[bool] = not _has_legacy
HAS_LEGACY_SDK: Final[bool] = find_spec("promptflow") is not None
MISSING_LEGACY_SDK: Final[bool] = not HAS_LEGACY_SDK
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from os import PathLike
from typing import Any, Callable, Dict, Optional, Union
from typing_extensions import TypeAlias
from types import SimpleNamespace

import pandas as pd

Expand All @@ -16,10 +17,12 @@
try:
from promptflow.client import PFClient as _PFClient
except ImportError:
from azure.ai.evaluation._legacy._persist._save import save_evaluator

class _PFClient:
def __init__(self, **kwargs):
self._config = Configuration(override_config=kwargs.pop("config", None))
self.flows = SimpleNamespace(save=save_evaluator)

def run(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

from typing import Callable, Dict, Final, Optional
from typing import Callable, Final
from typing_extensions import TypeAlias


Expand All @@ -25,4 +25,4 @@
ThreadPoolExecutorWithContext: TypeAlias = _ThreadPoolExecutorWithContext
inject_openai_api: Final[Callable[[], None]] = _inject
recover_openai_api: Final[Callable[[], None]] = _recover
start_trace: Final = _start_trace
start_trace: Final[Callable] = _start_trace
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,6 @@ def __init__(
self.metrics: Mapping[str, Any] = {}
self._run = run

# self._use_remote_flow = False
# self._from_flex_flow = True
# self._from_prompty = False
# self.flow = path to pointless flow file
# self._experiment_name = name of folder containing pointless flow file
# self._lineage_id = basically equivalent to a hex digest of the SHA256 hash of:
# f"{uuid.getnod()}/{posix_full_path_to_pointless_folder}"
# self._output_path = Path("<user_folder>/.promptflow/runs/<self.name>")
# self._flow_name = name of pointless folder

@property
def status(self) -> RunStatus:
return self._status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ class RunSubmitter:
THIS WILL BE REMOVED IN A FUTURE CODE UPDATE"""

def __init__(self, config: BatchEngineConfig, executor: Optional[Executor] = None):
# self._client = PFClient instance
# self._config = PFClient config
# self.run_operations = RunOperations instance

# TODO ralphe: Use proper logger here. Old code did LoggerFactory.get_logger(__name__)
self._config = config
self._executor = executor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

from dataclasses import fields, is_dataclass, MISSING
from enum import Enum, EnumMeta
from inspect import Parameter, Signature, isclass, signature
from typing import (
Mapping,
Optional,
Sequence,
Type,
TypedDict,
Union,
Dict,
Any,
Callable,
cast,
get_args,
get_origin,
get_type_hints
)


class TypeMetadata(TypedDict):
"""Metadata for a type.

:param str name: The name of the type.
:param str description: The description of the type.
:param str type: The type of the value.
:param Any default: The default value of the value.
"""
name: str
"""The name of argument/field/entry"""
type: Optional[Type]
"""The type of the argument/field/entry"""
value_type: Optional["ValueType"]
"""The value type of the argument/field/entry. This is used to determine the type of the value."""
default: Union[Any, Parameter.empty]
"""The default value of the argument/field/entry. This uses Parameter.empty to indicate that the
default value is not specified."""


class ValueType(Enum):
"""Value types."""
INT = "int"
DOUBLE = "double"
BOOL = "boolean"
STRING = "string"
LIST = "array"
OBJECT = "object"

@staticmethod
def from_type(t: Optional[Type]) -> Optional["ValueType"]:
"""Parse a type into a corresponding ValueType.

:param Optional[Type] t: The type to parse.
:return: The corresponding ValueType of the given type,or None if the type is not supported.
:rtype: Optional[ValueType]
"""

if t is Parameter.empty:
return ValueType.OBJECT

def type_or_subclass(t: Type, cls: Type) -> bool:
return t is cls or (isclass(t) and issubclass(t, cls))

t = ValueType.resolve_type(cast(Type, t))
if t is type(None): return ValueType.OBJECT
if type_or_subclass(t, int): return ValueType.INT
if type_or_subclass(t, float): return ValueType.DOUBLE
if type_or_subclass(t, bool): return ValueType.BOOL
if type_or_subclass(t, str): return ValueType.STRING
if type_or_subclass(t, Enum) or type_or_subclass(t, EnumMeta):
return ValueType.STRING
if type_or_subclass(t, list) or type_or_subclass(t, Sequence):
return ValueType.LIST
if type_or_subclass(t, dict) or type_or_subclass(t, Mapping):
return ValueType.OBJECT
if t is Any:
return ValueType.OBJECT

return None

@staticmethod
def resolve_type(t: Optional[Type]) -> Type:
"""Resolve a type to its base type. For example, if the type is List[int], it will be resolved to
List. If the type is Optional[int], it will be resolved to int.

:param Optional[Type] t: The type to resolve.
:return: The resolved type.
:rtype: Type"""

origin = get_origin(t)
if origin is None:
return t or type(None)

if origin is Union:
# Handle Optional[T] which is Union[T, None] by removing NoneType
types = [arg for arg in get_args(t) if arg is not type(None)]
if len(types) != 1:
raise ValueError("Only optional unions (aka Union[X, None]) are supported")
return types[0]
else:
return origin


def extract_type_metadata(item: Any, **kwargs) -> Mapping[str, TypeMetadata]:
"""Extracts metadata from a type.

:param Any type: The type to extract metadata from.
:return: A list of dictionaries containing the metadata. Will be empty if no metadata is found.
:rtype: Mapping[str, TypeMetadata]
"""

if item is None or item is Parameter.empty:
return {}

# NOTE: We use get_type_hints to get the type hints because:
# - It handles forward references and string annotations
# - It works for both classes, functions, dataclasses, TypeDicts, and so on
# - It handles nested types and generics
# The downsides are that it doesn't handle the fields/arguments without type hints (they are
# excluded from the metadata dictionary it generates), and that it doesn't handle default values
type_hints: Mapping[str, Type] = get_type_hints(
item,
globalns=kwargs.pop("globalns", None),
localns=kwargs.pop("localns", None))

if is_dataclass(item):
return {
f.name: {
"name": f.name,
"type": f.type if isinstance(f.type, type) else type_hints.get(f.name, None),
"value_type": ValueType.from_type(f.type) if isinstance(f.type, type) else None,
"default": f.default if f.default != MISSING else Parameter.empty,
}
for f in fields(item)
}
elif isinstance(item, Callable) and not _is_dict(item):
def get_type(name: str, param: Parameter) -> Optional[Type]:
resolved = type_hints.get(name, param.annotation)
if param.default is not Parameter.empty and resolved is Parameter.empty:
resolved = param.default.__class__ if isinstance(param.default, Enum) else type(param.default)
return resolved if resolved is not Parameter.empty else None

try:
sig: Signature = signature(item)
items: Dict[str, TypeMetadata] = {
name: {
"name": name,
"type": (item_type := get_type(name, param)),
"value_type": ValueType.from_type(item_type),
"default": param.default,
}
for name, param in sig.parameters.items()
if name not in ["self", "cls"] and param.kind not in [param.VAR_POSITIONAL, param.VAR_KEYWORD]
}

# also handle the return type
items.update({
"return": {
"name": "return",
"type": (item_type := type_hints.get("return", sig.return_annotation)),
"value_type": ValueType.from_type(item_type),
"default": Parameter.empty,
}
})
return items
except ValueError:
# If the signature cannot be resolved, we fall back to the type hints
pass

return {
k: {
"name": k,
"type": v,
"value_type": ValueType.from_type(v),
"default": Parameter.empty,
}
for k, v in type_hints.items()
}

def _is_dict(t: Any) -> bool:
"""Check if the type is a dict.

:param Any t: The type to check.
:return: True if the type is a dict, False otherwise.
:rtype: bool
"""
return (get_origin(t) is dict
or (isclass(t) and issubclass(t, dict))
or (isclass(t) and issubclass(t, Mapping))
or isinstance(t, dict))
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

from ._exceptions import EvaluationSaveError, EvaluationLoadError
from ._loaded_evaluator import LoadedEvaluator
from ._save import save_evaluator, load_evaluator

__all__ = [
"EvaluationSaveError",
"EvaluationLoadError",
"LoadedEvaluator",
"save_evaluator",
"load_evaluator",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

from typing import TypedDict, Dict, Any
from typing_extensions import NotRequired


class CallableArgMetadata(TypedDict):
"""Metadata for a callable argument, or output."""
type: str
"""The type of the argument."""
default: NotRequired[str]
"""Default value of the argument (optional)."""


class CallableMetadata(TypedDict):
"""Metadata for a callable."""
entry: str
"""The entry point of the callable. This is typically the function name, or ClassName:FunctionName."""
inputs: NotRequired[Dict[str, CallableArgMetadata]]
"""The input arguments for the callable."""
outputs: NotRequired[Dict[str, CallableArgMetadata]]
"""The outputs for the callable."""
init: NotRequired[Dict[str, CallableArgMetadata]]
"""The arguments for the callable's constructor (if applicable)."""
environment: NotRequired[Dict[str, Any]]
"""Any environment variables or settings for the callable."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

from azure.ai.evaluation._exceptions import EvaluationException, ErrorBlame, ErrorCategory


class EvaluationSaveError(EvaluationException):
"""Custom exception for errors during the evaluation save process.

:param str message: The error message."""

def __init__(self, message: str, **kwargs) -> None:
kwargs.setdefault("category", ErrorCategory.INVALID_VALUE)
kwargs.setdefault("blame", ErrorBlame.USER_ERROR)
super().__init__(message, **kwargs)


class EvaluationLoadError(EvaluationException):
"""Custom exception for errors during the evaluation load process.

:param str message: The error message."""

def __init__(self, message: str, **kwargs) -> None:
kwargs.setdefault("category", ErrorCategory.INVALID_VALUE)
kwargs.setdefault("blame", ErrorBlame.USER_ERROR)
super().__init__(message or "evaluator must be a function or a callable class", **kwargs)
Loading
Loading