Skip to content

Commit 1154ed6

Browse files
committed
add support for filedata_meta to expose more data other than typical input/output
1 parent 20bb20f commit 1154ed6

File tree

9 files changed

+88
-21
lines changed

9 files changed

+88
-21
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 0.0.9
2+
3+
* **Support optionally exposing addition metadata around FileData**
4+
15
## 0.0.8
26

37
* **reduce usage data log level** We do not want to have so much verbosity for something that might happen a lot

test/assets/filedata_meta.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import math
2+
from typing import Optional
3+
4+
from pydantic import BaseModel
5+
from unstructured_ingest.v2.interfaces import FileData
6+
7+
from unstructured_platform_plugins.schema import FileDataMeta, NewRecord
8+
9+
10+
class Input(BaseModel):
11+
m: int
12+
13+
14+
class Output(BaseModel):
15+
n: float
16+
17+
18+
def process_input(i: Input, file_data: FileData, filedata_meta: FileDataMeta) -> Optional[Output]:
19+
if i.m > 10:
20+
filedata_meta.terminate_current = True
21+
new_content = [
22+
NewRecord(
23+
file_data=FileData(
24+
identifier=str(i.m + x), connector_type=file_data.connector_type
25+
),
26+
contents=Output(n=float(i.m + x)),
27+
)
28+
for x in range(5)
29+
]
30+
filedata_meta.new_records.extend(new_content)
31+
return None
32+
else:
33+
return Output(n=math.pi + i.m)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.8" # pragma: no cover
1+
__version__ = "0.0.9" # pragma: no cover

unstructured_platform_plugins/etl_uvicorn/api_generator.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@
2323
get_schema_dict,
2424
map_inputs,
2525
)
26-
from unstructured_platform_plugins.exceptions import UnrecoverableException
26+
from unstructured_platform_plugins.schema import FileDataMeta, UsageData
2727
from unstructured_platform_plugins.schema.json_schema import (
2828
schema_to_base_model,
2929
)
30-
from unstructured_platform_plugins.schema.usage import UsageData
3130

3231
logger = logging.getLogger("uvicorn.error")
3332

@@ -85,10 +84,11 @@ def wrap_in_fastapi(
8584
class InvokeResponse(BaseModel):
8685
usage: list[UsageData]
8786
status_code: int
87+
filedata_meta: FileDataMeta
8888
status_code_text: Optional[str] = None
8989
output: Optional[response_type] = None
9090

91-
input_schema = get_input_schema(func, omit=["usage"])
91+
input_schema = get_input_schema(func, omit=["usage", "filedata_meta"])
9292
input_schema_model = schema_to_base_model(input_schema)
9393

9494
logging.getLogger("etl_uvicorn.fastapi")
@@ -97,36 +97,41 @@ class InvokeResponse(BaseModel):
9797

9898
async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> ResponseType:
9999
usage: list[UsageData] = []
100+
filedata_meta = FileDataMeta()
100101
request_dict = kwargs if kwargs else {}
101102
if "usage" in inspect.signature(func).parameters:
102103
request_dict["usage"] = usage
103104
else:
104-
logger.debug("usage data not an expected parameter, omitting")
105+
logger.warning("usage data not an expected parameter, omitting")
106+
if "filedata_meta" in inspect.signature(func).parameters:
107+
request_dict["filedata_meta"] = filedata_meta
105108
try:
106109
if inspect.isasyncgenfunction(func):
107110
# Stream response if function is an async generator
108111

109112
async def _stream_response():
110113
async for output in func(**(request_dict or {})):
111114
yield InvokeResponse(
112-
usage=usage, status_code=status.HTTP_200_OK, output=output
115+
usage=usage,
116+
filedata_meta=filedata_meta,
117+
status_code=status.HTTP_200_OK,
118+
output=output,
113119
).model_dump_json() + "\n"
114120

115121
return StreamingResponse(_stream_response(), media_type="application/x-ndjson")
116122
else:
117-
try:
118-
output = await invoke_func(func=func, kwargs=request_dict)
119-
return InvokeResponse(
120-
usage=usage, status_code=status.HTTP_200_OK, output=output
121-
)
122-
except UnrecoverableException as ex:
123-
# Thrower of this exception is responsible for logging necessary information
124-
logger.info("Unrecoverable error occurred during plugin invocation")
125-
return InvokeResponse(usage=usage, status_code=512, status_code_text=ex.message)
123+
output = await invoke_func(func=func, kwargs=request_dict)
124+
return InvokeResponse(
125+
usage=usage,
126+
filedata_meta=filedata_meta,
127+
status_code=status.HTTP_200_OK,
128+
output=output,
129+
)
126130
except Exception as invoke_error:
127131
logger.error(f"failed to invoke plugin: {invoke_error}", exc_info=True)
128132
return InvokeResponse(
129133
usage=usage,
134+
filedata_meta=filedata_meta,
130135
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
131136
status_code_text=f"failed to invoke plugin: "
132137
f"[{invoke_error.__class__.__name__}] {invoke_error}",

unstructured_platform_plugins/etl_uvicorn/otel.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ def get_settings() -> OtelSettings:
4242

4343
def get_trace_provider() -> TracerProvider:
4444
settings = get_settings()
45-
print(settings)
4645
provider = TracerProvider(resource=Resource({SERVICE_NAME: settings["service_name"]}))
4746

4847
for trace_exporter_type in settings["trace_exporters"]:

unstructured_platform_plugins/etl_uvicorn/utils.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,8 @@ def map_inputs(func: Callable, raw_inputs: dict[str, Any]) -> dict[str, Any]:
111111
and not isinstance(type_data, GenericAlias)
112112
and issubclass(type_data, BaseModel)
113113
):
114-
raw_inputs[field_name] = type_data.model_validate(raw_inputs[field_name])
114+
field_value = raw_inputs[field_name]
115+
raw_inputs[field_name] = type_data.model_validate(field_value.model_dump())
115116
except Exception as e:
116-
exception_type = type(e)
117-
raise exception_type(
118-
f"failed to map input for field {field_name}: {field_value}"
119-
) from e
117+
raise ValueError(f"failed to map input for field {field_name}: {field_value}") from e
120118
return raw_inputs
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .filedata_meta import FileDataMeta, NewRecord
2+
from .usage import UsageData
3+
4+
__all__ = ["UsageData", "FileDataMeta", "NewRecord"]
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from typing import Any
2+
3+
from pydantic import BaseModel, Field
4+
from unstructured_ingest.v2.interfaces import FileData
5+
6+
7+
class NewRecord(BaseModel):
8+
file_data: FileData
9+
contents: Any
10+
11+
12+
class FileDataMeta(BaseModel):
13+
terminate_current: bool = False
14+
new_records: list[NewRecord] = Field(default_factory=list)

unstructured_platform_plugins/schema/json_schema.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ def dataclass_to_json_schema(class_or_instance: Any) -> dict:
113113
f_resp = to_json_schema(t)
114114
if f.default is not MISSING:
115115
f_resp["default"] = f.default.value if isinstance(f.default, Enum) else f.default
116+
elif f.default_factory is not MISSING:
117+
default = f.default_factory()
118+
try:
119+
f_resp["default"] = to_json_schema(default)
120+
except Exception:
121+
f_resp["default"] = default
116122
else:
117123
required.append(f.name)
118124
properties[f.name] = f_resp
@@ -339,6 +345,10 @@ def schema_to_base_model(schema: dict, name: str = "reconstructed_model") -> Typ
339345
t = schema_to_base_model_type(
340346
json_type_name=json_type_name, name=k, type_info=entry_info
341347
)
348+
elif "properties" in v:
349+
t = schema_to_base_model(schema=v, name=k)
350+
elif not v:
351+
t = schema_to_base_model(schema={"type": "null"}, name=k)
342352
else:
343353
json_type_name = v["type"]
344354
t = schema_to_base_model_type(json_type_name=json_type_name, name=k, type_info=v)

0 commit comments

Comments
 (0)