Skip to content

Commit f89d2a6

Browse files
authored
feat/filedata metadata support (#26)
1 parent 20bb20f commit f89d2a6

File tree

11 files changed

+240
-27
lines changed

11 files changed

+240
-27
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 0.0.9
2+
3+
* **Support optionally exposing addition metadata around FileData**
4+
15
## 0.0.8
26

37
* **reduce usage data log level** We do not want to have so much verbosity for something that might happen a lot

test/assets/filedata_meta.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import math
2+
from typing import Optional
3+
4+
from pydantic import BaseModel
5+
from unstructured_ingest.v2.interfaces import FileData
6+
7+
from unstructured_platform_plugins.schema import FileDataMeta, NewRecord
8+
9+
10+
class Input(BaseModel):
11+
m: int
12+
13+
14+
class Output(BaseModel):
15+
n: float
16+
17+
18+
def process_input(i: Input, file_data: FileData, filedata_meta: FileDataMeta) -> Optional[Output]:
19+
if i.m > 10:
20+
filedata_meta.terminate_current = True
21+
new_content = [
22+
NewRecord(
23+
file_data=FileData(
24+
identifier=str(i.m + x), connector_type=file_data.connector_type
25+
),
26+
contents=Output(n=float(i.m + x)),
27+
)
28+
for x in range(5)
29+
]
30+
filedata_meta.new_records.extend(new_content)
31+
return None
32+
else:
33+
return Output(n=math.pi + i.m)

test/test_schema.py

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -560,18 +560,72 @@ def fn(a: FileData) -> list[FileData]:
560560
},
561561
},
562562
"required": [],
563+
"default": {
564+
"type": "object",
565+
"properties": {
566+
"url": {
567+
"anyOf": [{"type": "string"}, {"type": "null"}],
568+
"default": None,
569+
},
570+
"version": {
571+
"anyOf": [{"type": "string"}, {"type": "null"}],
572+
"default": None,
573+
},
574+
"record_locator": {
575+
"anyOf": [
576+
{
577+
"type": "object",
578+
"items": {"key": {"type": "string"}, "value": {}},
579+
},
580+
{"type": "null"},
581+
],
582+
"default": None,
583+
},
584+
"date_created": {
585+
"anyOf": [{"type": "string"}, {"type": "null"}],
586+
"default": None,
587+
},
588+
"date_modified": {
589+
"anyOf": [{"type": "string"}, {"type": "null"}],
590+
"default": None,
591+
},
592+
"date_processed": {
593+
"anyOf": [{"type": "string"}, {"type": "null"}],
594+
"default": None,
595+
},
596+
"permissions_data": {
597+
"anyOf": [
598+
{
599+
"type": "array",
600+
"items": {
601+
"type": "object",
602+
"items": {"key": {"type": "string"}, "value": {}},
603+
},
604+
},
605+
{"type": "null"},
606+
],
607+
"default": None,
608+
},
609+
"filesize_bytes": {
610+
"anyOf": [{"type": "integer"}, {"type": "null"}],
611+
"default": None,
612+
},
613+
},
614+
"required": [],
615+
},
563616
},
564617
"additional_metadata": {
565618
"type": "object",
566619
"items": {"key": {"type": "string"}, "value": {}},
620+
"default": {},
567621
},
568622
"reprocess": {"type": "boolean", "default": False},
569623
"local_download_path": {
570624
"anyOf": [{"type": "string"}, {"type": "null"}],
571625
"default": None,
572626
},
573627
},
574-
"required": ["identifier", "connector_type", "metadata", "additional_metadata"],
628+
"required": ["identifier", "connector_type"],
575629
}
576630
},
577631
}
@@ -657,20 +711,73 @@ def fn(a: FileData) -> list[FileData]:
657711
},
658712
},
659713
"required": [],
714+
"default": {
715+
"type": "object",
716+
"properties": {
717+
"url": {
718+
"anyOf": [{"type": "string"}, {"type": "null"}],
719+
"default": None,
720+
},
721+
"version": {
722+
"anyOf": [{"type": "string"}, {"type": "null"}],
723+
"default": None,
724+
},
725+
"record_locator": {
726+
"anyOf": [
727+
{
728+
"type": "object",
729+
"items": {"key": {"type": "string"}, "value": {}},
730+
},
731+
{"type": "null"},
732+
],
733+
"default": None,
734+
},
735+
"date_created": {
736+
"anyOf": [{"type": "string"}, {"type": "null"}],
737+
"default": None,
738+
},
739+
"date_modified": {
740+
"anyOf": [{"type": "string"}, {"type": "null"}],
741+
"default": None,
742+
},
743+
"date_processed": {
744+
"anyOf": [{"type": "string"}, {"type": "null"}],
745+
"default": None,
746+
},
747+
"permissions_data": {
748+
"anyOf": [
749+
{
750+
"type": "array",
751+
"items": {
752+
"type": "object",
753+
"items": {"key": {"type": "string"}, "value": {}},
754+
},
755+
},
756+
{"type": "null"},
757+
],
758+
"default": None,
759+
},
760+
"filesize_bytes": {
761+
"anyOf": [{"type": "integer"}, {"type": "null"}],
762+
"default": None,
763+
},
764+
},
765+
"required": [],
766+
},
660767
},
661768
"additional_metadata": {
662769
"type": "object",
663770
"items": {"key": {"type": "string"}, "value": {}},
771+
"default": {},
664772
},
665773
"reprocess": {"type": "boolean", "default": False},
666774
"local_download_path": {
667775
"anyOf": [{"type": "string"}, {"type": "null"}],
668776
"default": None,
669777
},
670778
},
671-
"required": ["identifier", "connector_type", "metadata", "additional_metadata"],
779+
"required": ["identifier", "connector_type"],
672780
},
673781
}
674-
675782
assert output_schema == expected_output_schema
676783
assert is_valid_response_dict(output_schema)

test/test_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,5 +136,5 @@ def fn(a: FileData) -> None:
136136

137137
inputs = {"a": {"not": "the", "right": "values"}}
138138

139-
with pytest.raises(KeyError):
139+
with pytest.raises(ValueError):
140140
utils.map_inputs(func=fn, raw_inputs=inputs)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.8" # pragma: no cover
1+
__version__ = "0.0.9" # pragma: no cover

unstructured_platform_plugins/etl_uvicorn/api_generator.py

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44
import json
55
import logging
66
from functools import partial
7-
from typing import Any, Callable, Optional
7+
from typing import Any, Callable, Optional, Union
88

99
from fastapi import FastAPI, status
1010
from fastapi.responses import StreamingResponse
1111
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
12-
from pydantic import BaseModel
12+
from pydantic import BaseModel, Field, create_model
1313
from starlette.responses import RedirectResponse
14+
from unstructured_ingest.v2.interfaces import FileData
1415
from uvicorn.config import LOG_LEVELS
1516
from uvicorn.importer import import_from_string
1617

@@ -23,11 +24,10 @@
2324
get_schema_dict,
2425
map_inputs,
2526
)
26-
from unstructured_platform_plugins.exceptions import UnrecoverableException
27+
from unstructured_platform_plugins.schema import FileDataMeta, NewRecord, UsageData
2728
from unstructured_platform_plugins.schema.json_schema import (
2829
schema_to_base_model,
2930
)
30-
from unstructured_platform_plugins.schema.usage import UsageData
3131

3232
logger = logging.getLogger("uvicorn.error")
3333

@@ -68,6 +68,30 @@ def check_precheck_func(precheck_func: Callable):
6868
raise ValueError(f"no output should exist for precheck function, found: {outputs}")
6969

7070

71+
def is_optional(t: Any) -> bool:
72+
return (
73+
hasattr(t, "__origin__")
74+
and t.__origin__ is Union
75+
and hasattr(t, "__args__")
76+
and type(None) in t.__args__
77+
)
78+
79+
80+
def update_filedata_model(new_type) -> BaseModel:
81+
field_info = NewRecord.model_fields["contents"]
82+
if is_optional(new_type):
83+
field_info.default = None
84+
new_record_model = create_model(
85+
NewRecord.__name__, contents=(new_type, field_info), __base__=NewRecord
86+
)
87+
new_filedata_model = create_model(
88+
FileDataMeta.__name__,
89+
new_records=(list[new_record_model], Field(default_factory=list)),
90+
__base__=FileDataMeta,
91+
)
92+
return new_filedata_model
93+
94+
7195
def wrap_in_fastapi(
7296
func: Callable,
7397
plugin_id: str,
@@ -81,14 +105,16 @@ def wrap_in_fastapi(
81105
fastapi_app = FastAPI()
82106

83107
response_type = get_output_sig(func)
108+
filedata_meta_model = update_filedata_model(response_type)
84109

85110
class InvokeResponse(BaseModel):
86111
usage: list[UsageData]
87112
status_code: int
113+
filedata_meta: filedata_meta_model
88114
status_code_text: Optional[str] = None
89115
output: Optional[response_type] = None
90116

91-
input_schema = get_input_schema(func, omit=["usage"])
117+
input_schema = get_input_schema(func, omit=["usage", "filedata_meta"])
92118
input_schema_model = schema_to_base_model(input_schema)
93119

94120
logging.getLogger("etl_uvicorn.fastapi")
@@ -97,36 +123,43 @@ class InvokeResponse(BaseModel):
97123

98124
async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> ResponseType:
99125
usage: list[UsageData] = []
126+
filedata_meta = FileDataMeta()
100127
request_dict = kwargs if kwargs else {}
101128
if "usage" in inspect.signature(func).parameters:
102129
request_dict["usage"] = usage
103130
else:
104-
logger.debug("usage data not an expected parameter, omitting")
131+
logger.warning("usage data not an expected parameter, omitting")
132+
if "filedata_meta" in inspect.signature(func).parameters:
133+
request_dict["filedata_meta"] = filedata_meta
105134
try:
106135
if inspect.isasyncgenfunction(func):
107136
# Stream response if function is an async generator
108137

109138
async def _stream_response():
110139
async for output in func(**(request_dict or {})):
111140
yield InvokeResponse(
112-
usage=usage, status_code=status.HTTP_200_OK, output=output
141+
usage=usage,
142+
filedata_meta=filedata_meta_model.model_validate(
143+
filedata_meta.model_dump()
144+
),
145+
status_code=status.HTTP_200_OK,
146+
output=output,
113147
).model_dump_json() + "\n"
114148

115149
return StreamingResponse(_stream_response(), media_type="application/x-ndjson")
116150
else:
117-
try:
118-
output = await invoke_func(func=func, kwargs=request_dict)
119-
return InvokeResponse(
120-
usage=usage, status_code=status.HTTP_200_OK, output=output
121-
)
122-
except UnrecoverableException as ex:
123-
# Thrower of this exception is responsible for logging necessary information
124-
logger.info("Unrecoverable error occurred during plugin invocation")
125-
return InvokeResponse(usage=usage, status_code=512, status_code_text=ex.message)
151+
output = await invoke_func(func=func, kwargs=request_dict)
152+
return InvokeResponse(
153+
usage=usage,
154+
filedata_meta=filedata_meta_model.model_validate(filedata_meta.model_dump()),
155+
status_code=status.HTTP_200_OK,
156+
output=output,
157+
)
126158
except Exception as invoke_error:
127159
logger.error(f"failed to invoke plugin: {invoke_error}", exc_info=True)
128160
return InvokeResponse(
129161
usage=usage,
162+
filedata_meta=filedata_meta_model.model_validate(filedata_meta.model_dump()),
130163
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
131164
status_code_text=f"failed to invoke plugin: "
132165
f"[{invoke_error.__class__.__name__}] {invoke_error}",
@@ -139,6 +172,11 @@ async def run_job(request: input_schema_model) -> ResponseType:
139172
log_func_and_body(func=func, body=request.json())
140173
# Create dictionary from pydantic model while preserving underlying types
141174
request_dict = {f: getattr(request, f) for f in request.model_fields}
175+
# Map FileData back to original dataclass if present
176+
if "file_data" in request_dict:
177+
request_dict["file_data"] = FileData.from_dict(
178+
request_dict["file_data"].model_dump()
179+
)
142180
map_inputs(func=func, raw_inputs=request_dict)
143181
if logger.level == LOG_LEVELS.get("trace", logging.NOTSET):
144182
logger.log(level=logger.level, msg=f"passing inputs to function: {request_dict}")

unstructured_platform_plugins/etl_uvicorn/otel.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ def get_settings() -> OtelSettings:
4242

4343
def get_trace_provider() -> TracerProvider:
4444
settings = get_settings()
45-
print(settings)
4645
provider = TracerProvider(resource=Resource({SERVICE_NAME: settings["service_name"]}))
4746

4847
for trace_exporter_type in settings["trace_exporters"]:

unstructured_platform_plugins/etl_uvicorn/utils.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
from unstructured_platform_plugins.type_hints import get_type_hints
1717

1818

19+
def is_optional(t: Any) -> bool:
20+
return hasattr(t, "__origin__") and t.__origin__ is not None
21+
22+
1923
def get_func(instance: Any, method_name: Optional[str] = None) -> Callable:
2024
method_name = method_name or "__call__"
2125
if inspect.isfunction(instance):
@@ -111,10 +115,10 @@ def map_inputs(func: Callable, raw_inputs: dict[str, Any]) -> dict[str, Any]:
111115
and not isinstance(type_data, GenericAlias)
112116
and issubclass(type_data, BaseModel)
113117
):
114-
raw_inputs[field_name] = type_data.model_validate(raw_inputs[field_name])
118+
field_value = raw_inputs[field_name]
119+
if isinstance(field_value, BaseModel):
120+
field_value = field_value.model_dump()
121+
raw_inputs[field_name] = type_data.model_validate(field_value)
115122
except Exception as e:
116-
exception_type = type(e)
117-
raise exception_type(
118-
f"failed to map input for field {field_name}: {field_value}"
119-
) from e
123+
raise ValueError(f"failed to map input for field {field_name}: {field_value}") from e
120124
return raw_inputs
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .filedata_meta import FileDataMeta, NewRecord
2+
from .usage import UsageData
3+
4+
__all__ = ["UsageData", "FileDataMeta", "NewRecord"]

0 commit comments

Comments
 (0)