diff --git a/CHANGELOG.md b/CHANGELOG.md index 04e326f..4975564 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.0.9 + +* **Support optionally exposing addition metadata around FileData** + ## 0.0.8 * **reduce usage data log level** We do not want to have so much verbosity for something that might happen a lot diff --git a/test/assets/filedata_meta.py b/test/assets/filedata_meta.py new file mode 100644 index 0000000..55ebd0a --- /dev/null +++ b/test/assets/filedata_meta.py @@ -0,0 +1,33 @@ +import math +from typing import Optional + +from pydantic import BaseModel +from unstructured_ingest.v2.interfaces import FileData + +from unstructured_platform_plugins.schema import FileDataMeta, NewRecord + + +class Input(BaseModel): + m: int + + +class Output(BaseModel): + n: float + + +def process_input(i: Input, file_data: FileData, filedata_meta: FileDataMeta) -> Optional[Output]: + if i.m > 10: + filedata_meta.terminate_current = True + new_content = [ + NewRecord( + file_data=FileData( + identifier=str(i.m + x), connector_type=file_data.connector_type + ), + contents=Output(n=float(i.m + x)), + ) + for x in range(5) + ] + filedata_meta.new_records.extend(new_content) + return None + else: + return Output(n=math.pi + i.m) diff --git a/test/test_schema.py b/test/test_schema.py index 913bee9..fef8951 100644 --- a/test/test_schema.py +++ b/test/test_schema.py @@ -560,10 +560,64 @@ def fn(a: FileData) -> list[FileData]: }, }, "required": [], + "default": { + "type": "object", + "properties": { + "url": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "default": None, + }, + "version": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "default": None, + }, + "record_locator": { + "anyOf": [ + { + "type": "object", + "items": {"key": {"type": "string"}, "value": {}}, + }, + {"type": "null"}, + ], + "default": None, + }, + "date_created": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "default": None, + }, + "date_modified": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "default": None, + }, + "date_processed": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "default": None, + }, + "permissions_data": { + "anyOf": [ + { + "type": "array", + "items": { + "type": "object", + "items": {"key": {"type": "string"}, "value": {}}, + }, + }, + {"type": "null"}, + ], + "default": None, + }, + "filesize_bytes": { + "anyOf": [{"type": "integer"}, {"type": "null"}], + "default": None, + }, + }, + "required": [], + }, }, "additional_metadata": { "type": "object", "items": {"key": {"type": "string"}, "value": {}}, + "default": {}, }, "reprocess": {"type": "boolean", "default": False}, "local_download_path": { @@ -571,7 +625,7 @@ def fn(a: FileData) -> list[FileData]: "default": None, }, }, - "required": ["identifier", "connector_type", "metadata", "additional_metadata"], + "required": ["identifier", "connector_type"], } }, } @@ -657,10 +711,64 @@ def fn(a: FileData) -> list[FileData]: }, }, "required": [], + "default": { + "type": "object", + "properties": { + "url": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "default": None, + }, + "version": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "default": None, + }, + "record_locator": { + "anyOf": [ + { + "type": "object", + "items": {"key": {"type": "string"}, "value": {}}, + }, + {"type": "null"}, + ], + "default": None, + }, + "date_created": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "default": None, + }, + "date_modified": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "default": None, + }, + "date_processed": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "default": None, + }, + "permissions_data": { + "anyOf": [ + { + "type": "array", + "items": { + "type": "object", + "items": {"key": {"type": "string"}, "value": {}}, + }, + }, + {"type": "null"}, + ], + "default": None, + }, + "filesize_bytes": { + "anyOf": [{"type": "integer"}, {"type": "null"}], + "default": None, + }, + }, + "required": [], + }, }, "additional_metadata": { "type": "object", "items": {"key": {"type": "string"}, "value": {}}, + "default": {}, }, "reprocess": {"type": "boolean", "default": False}, "local_download_path": { @@ -668,9 +776,8 @@ def fn(a: FileData) -> list[FileData]: "default": None, }, }, - "required": ["identifier", "connector_type", "metadata", "additional_metadata"], + "required": ["identifier", "connector_type"], }, } - assert output_schema == expected_output_schema assert is_valid_response_dict(output_schema) diff --git a/test/test_utils.py b/test/test_utils.py index ab054da..bd48316 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -136,5 +136,5 @@ def fn(a: FileData) -> None: inputs = {"a": {"not": "the", "right": "values"}} - with pytest.raises(KeyError): + with pytest.raises(ValueError): utils.map_inputs(func=fn, raw_inputs=inputs) diff --git a/unstructured_platform_plugins/__version__.py b/unstructured_platform_plugins/__version__.py index 0e12f90..2215aa1 100644 --- a/unstructured_platform_plugins/__version__.py +++ b/unstructured_platform_plugins/__version__.py @@ -1 +1 @@ -__version__ = "0.0.8" # pragma: no cover +__version__ = "0.0.9" # pragma: no cover diff --git a/unstructured_platform_plugins/etl_uvicorn/api_generator.py b/unstructured_platform_plugins/etl_uvicorn/api_generator.py index c25b51f..d3d109b 100644 --- a/unstructured_platform_plugins/etl_uvicorn/api_generator.py +++ b/unstructured_platform_plugins/etl_uvicorn/api_generator.py @@ -4,13 +4,14 @@ import json import logging from functools import partial -from typing import Any, Callable, Optional +from typing import Any, Callable, Optional, Union from fastapi import FastAPI, status from fastapi.responses import StreamingResponse from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from pydantic import BaseModel +from pydantic import BaseModel, Field, create_model from starlette.responses import RedirectResponse +from unstructured_ingest.v2.interfaces import FileData from uvicorn.config import LOG_LEVELS from uvicorn.importer import import_from_string @@ -23,11 +24,10 @@ get_schema_dict, map_inputs, ) -from unstructured_platform_plugins.exceptions import UnrecoverableException +from unstructured_platform_plugins.schema import FileDataMeta, NewRecord, UsageData from unstructured_platform_plugins.schema.json_schema import ( schema_to_base_model, ) -from unstructured_platform_plugins.schema.usage import UsageData logger = logging.getLogger("uvicorn.error") @@ -68,6 +68,30 @@ def check_precheck_func(precheck_func: Callable): raise ValueError(f"no output should exist for precheck function, found: {outputs}") +def is_optional(t: Any) -> bool: + return ( + hasattr(t, "__origin__") + and t.__origin__ is Union + and hasattr(t, "__args__") + and type(None) in t.__args__ + ) + + +def update_filedata_model(new_type) -> BaseModel: + field_info = NewRecord.model_fields["contents"] + if is_optional(new_type): + field_info.default = None + new_record_model = create_model( + NewRecord.__name__, contents=(new_type, field_info), __base__=NewRecord + ) + new_filedata_model = create_model( + FileDataMeta.__name__, + new_records=(list[new_record_model], Field(default_factory=list)), + __base__=FileDataMeta, + ) + return new_filedata_model + + def wrap_in_fastapi( func: Callable, plugin_id: str, @@ -81,14 +105,16 @@ def wrap_in_fastapi( fastapi_app = FastAPI() response_type = get_output_sig(func) + filedata_meta_model = update_filedata_model(response_type) class InvokeResponse(BaseModel): usage: list[UsageData] status_code: int + filedata_meta: filedata_meta_model status_code_text: Optional[str] = None output: Optional[response_type] = None - input_schema = get_input_schema(func, omit=["usage"]) + input_schema = get_input_schema(func, omit=["usage", "filedata_meta"]) input_schema_model = schema_to_base_model(input_schema) logging.getLogger("etl_uvicorn.fastapi") @@ -97,11 +123,14 @@ class InvokeResponse(BaseModel): async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> ResponseType: usage: list[UsageData] = [] + filedata_meta = FileDataMeta() request_dict = kwargs if kwargs else {} if "usage" in inspect.signature(func).parameters: request_dict["usage"] = usage else: - logger.debug("usage data not an expected parameter, omitting") + logger.warning("usage data not an expected parameter, omitting") + if "filedata_meta" in inspect.signature(func).parameters: + request_dict["filedata_meta"] = filedata_meta try: if inspect.isasyncgenfunction(func): # Stream response if function is an async generator @@ -109,24 +138,28 @@ async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> Re async def _stream_response(): async for output in func(**(request_dict or {})): yield InvokeResponse( - usage=usage, status_code=status.HTTP_200_OK, output=output + usage=usage, + filedata_meta=filedata_meta_model.model_validate( + filedata_meta.model_dump() + ), + status_code=status.HTTP_200_OK, + output=output, ).model_dump_json() + "\n" return StreamingResponse(_stream_response(), media_type="application/x-ndjson") else: - try: - output = await invoke_func(func=func, kwargs=request_dict) - return InvokeResponse( - usage=usage, status_code=status.HTTP_200_OK, output=output - ) - except UnrecoverableException as ex: - # Thrower of this exception is responsible for logging necessary information - logger.info("Unrecoverable error occurred during plugin invocation") - return InvokeResponse(usage=usage, status_code=512, status_code_text=ex.message) + output = await invoke_func(func=func, kwargs=request_dict) + return InvokeResponse( + usage=usage, + filedata_meta=filedata_meta_model.model_validate(filedata_meta.model_dump()), + status_code=status.HTTP_200_OK, + output=output, + ) except Exception as invoke_error: logger.error(f"failed to invoke plugin: {invoke_error}", exc_info=True) return InvokeResponse( usage=usage, + filedata_meta=filedata_meta_model.model_validate(filedata_meta.model_dump()), status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code_text=f"failed to invoke plugin: " f"[{invoke_error.__class__.__name__}] {invoke_error}", @@ -139,6 +172,11 @@ async def run_job(request: input_schema_model) -> ResponseType: log_func_and_body(func=func, body=request.json()) # Create dictionary from pydantic model while preserving underlying types request_dict = {f: getattr(request, f) for f in request.model_fields} + # Map FileData back to original dataclass if present + if "file_data" in request_dict: + request_dict["file_data"] = FileData.from_dict( + request_dict["file_data"].model_dump() + ) map_inputs(func=func, raw_inputs=request_dict) if logger.level == LOG_LEVELS.get("trace", logging.NOTSET): logger.log(level=logger.level, msg=f"passing inputs to function: {request_dict}") diff --git a/unstructured_platform_plugins/etl_uvicorn/otel.py b/unstructured_platform_plugins/etl_uvicorn/otel.py index f0ad35e..5eae4e4 100644 --- a/unstructured_platform_plugins/etl_uvicorn/otel.py +++ b/unstructured_platform_plugins/etl_uvicorn/otel.py @@ -42,7 +42,6 @@ def get_settings() -> OtelSettings: def get_trace_provider() -> TracerProvider: settings = get_settings() - print(settings) provider = TracerProvider(resource=Resource({SERVICE_NAME: settings["service_name"]})) for trace_exporter_type in settings["trace_exporters"]: diff --git a/unstructured_platform_plugins/etl_uvicorn/utils.py b/unstructured_platform_plugins/etl_uvicorn/utils.py index 4ddbe94..d391d82 100644 --- a/unstructured_platform_plugins/etl_uvicorn/utils.py +++ b/unstructured_platform_plugins/etl_uvicorn/utils.py @@ -16,6 +16,10 @@ from unstructured_platform_plugins.type_hints import get_type_hints +def is_optional(t: Any) -> bool: + return hasattr(t, "__origin__") and t.__origin__ is not None + + def get_func(instance: Any, method_name: Optional[str] = None) -> Callable: method_name = method_name or "__call__" if inspect.isfunction(instance): @@ -111,10 +115,10 @@ def map_inputs(func: Callable, raw_inputs: dict[str, Any]) -> dict[str, Any]: and not isinstance(type_data, GenericAlias) and issubclass(type_data, BaseModel) ): - raw_inputs[field_name] = type_data.model_validate(raw_inputs[field_name]) + field_value = raw_inputs[field_name] + if isinstance(field_value, BaseModel): + field_value = field_value.model_dump() + raw_inputs[field_name] = type_data.model_validate(field_value) except Exception as e: - exception_type = type(e) - raise exception_type( - f"failed to map input for field {field_name}: {field_value}" - ) from e + raise ValueError(f"failed to map input for field {field_name}: {field_value}") from e return raw_inputs diff --git a/unstructured_platform_plugins/schema/__init__.py b/unstructured_platform_plugins/schema/__init__.py index e69de29..5c7934d 100644 --- a/unstructured_platform_plugins/schema/__init__.py +++ b/unstructured_platform_plugins/schema/__init__.py @@ -0,0 +1,4 @@ +from .filedata_meta import FileDataMeta, NewRecord +from .usage import UsageData + +__all__ = ["UsageData", "FileDataMeta", "NewRecord"] diff --git a/unstructured_platform_plugins/schema/filedata_meta.py b/unstructured_platform_plugins/schema/filedata_meta.py new file mode 100644 index 0000000..281ca17 --- /dev/null +++ b/unstructured_platform_plugins/schema/filedata_meta.py @@ -0,0 +1,14 @@ +from typing import Any + +from pydantic import BaseModel, Field +from unstructured_ingest.v2.interfaces import FileData + + +class NewRecord(BaseModel): + file_data: FileData + contents: Any + + +class FileDataMeta(BaseModel): + terminate_current: bool = False + new_records: list[NewRecord] = Field(default_factory=list) diff --git a/unstructured_platform_plugins/schema/json_schema.py b/unstructured_platform_plugins/schema/json_schema.py index d2db30c..f581d19 100644 --- a/unstructured_platform_plugins/schema/json_schema.py +++ b/unstructured_platform_plugins/schema/json_schema.py @@ -113,6 +113,12 @@ def dataclass_to_json_schema(class_or_instance: Any) -> dict: f_resp = to_json_schema(t) if f.default is not MISSING: f_resp["default"] = f.default.value if isinstance(f.default, Enum) else f.default + elif f.default_factory is not MISSING: + default = f.default_factory() + try: + f_resp["default"] = to_json_schema(default) + except Exception: + f_resp["default"] = default else: required.append(f.name) properties[f.name] = f_resp @@ -339,6 +345,10 @@ def schema_to_base_model(schema: dict, name: str = "reconstructed_model") -> Typ t = schema_to_base_model_type( json_type_name=json_type_name, name=k, type_info=entry_info ) + elif "properties" in v: + t = schema_to_base_model(schema=v, name=k) + elif not v: + t = schema_to_base_model(schema={"type": "null"}, name=k) else: json_type_name = v["type"] t = schema_to_base_model_type(json_type_name=json_type_name, name=k, type_info=v)