Skip to content

Commit 3f699db

Browse files
authored
feat: introduce emitting usage data (#14)
1 parent 693362b commit 3f699db

File tree

3 files changed

+49
-26
lines changed

3 files changed

+49
-26
lines changed

unstructured_platform_plugins/etl_uvicorn/api_generator.py

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import logging
66
from typing import Any, Callable, Optional
77

8-
from fastapi import FastAPI, HTTPException, status
8+
from fastapi import FastAPI, status
99
from pydantic import BaseModel
1010
from starlette.responses import RedirectResponse
1111
from uvicorn.importer import import_from_string
@@ -21,6 +21,7 @@
2121
from unstructured_platform_plugins.schema.json_schema import (
2222
schema_to_base_model,
2323
)
24+
from unstructured_platform_plugins.schema.usage import UsageData
2425

2526
logger = logging.getLogger("uvicorn.error")
2627

@@ -54,44 +55,56 @@ def generate_fast_api(
5455

5556
response_type = get_output_sig(func)
5657

57-
input_schema_model = schema_to_base_model(get_input_schema(func))
58+
class InvokeResponse(BaseModel):
59+
usage: list[UsageData]
60+
status_code: int
61+
status_code_text: Optional[str] = None
62+
output: Optional[response_type] = None
63+
64+
input_schema = get_input_schema(func, omit=["usage"])
65+
input_schema_model = schema_to_base_model(input_schema)
5866

5967
logging.getLogger("etl_uvicorn.fastapi")
6068

69+
usage: list[UsageData] = []
70+
71+
async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> InvokeResponse:
72+
request_dict = kwargs if kwargs else {}
73+
if "usage" in inspect.signature(func).parameters:
74+
request_dict["usage"] = usage
75+
else:
76+
logger.warning("usage data not an expected parameter, omitting")
77+
try:
78+
output = await invoke_func(func=func, kwargs=request_dict)
79+
return InvokeResponse(usage=usage, status_code=status.HTTP_200_OK, output=output)
80+
except Exception as invoke_error:
81+
logger.error(f"failed to invoke plugin: {invoke_error}", exc_info=True)
82+
return InvokeResponse(
83+
usage=usage,
84+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
85+
status_code_text=f"failed to invoke plugin: "
86+
f"[{invoke_error.__class__.__name__}] {invoke_error}",
87+
)
88+
6189
if input_schema_model.model_fields:
62-
logger.debug(f"input model set to: {input_schema_model.model_fields}")
6390

64-
@fastapi_app.post("/invoke", response_model=response_type)
65-
async def run_job(request: input_schema_model) -> response_type:
91+
@fastapi_app.post("/invoke", response_model=InvokeResponse)
92+
async def run_job(request: input_schema_model) -> InvokeResponse:
6693
logger.debug(f"invoking function {func} with input: {request.model_dump()}")
6794
# Create dictionary from pydantic model while preserving underlying types
6895
request_dict = {f: getattr(request, f) for f in request.model_fields}
6996
map_inputs(func=func, raw_inputs=request_dict)
7097
logger.debug(f"passing inputs to function: {request_dict}")
71-
try:
72-
return await invoke_func(func=func, kwargs=request_dict)
73-
except Exception as e:
74-
logger.error(
75-
f"failed to invoke plugin with inputs {request_dict}: {e}", exc_info=True
76-
)
77-
raise HTTPException(
78-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
79-
detail=f"failed to invoke plugin: {e}",
80-
)
98+
return await wrap_fn(func=func, kwargs=request_dict)
8199

82100
else:
83101

84102
@fastapi_app.post("/invoke", response_model=response_type)
85103
async def run_job() -> response_type:
86104
logger.debug(f"invoking function without inputs: {func}")
87-
try:
88-
return await invoke_func(func=func)
89-
except Exception as e:
90-
logger.error(f"failed to invoke plugin: {e}", exc_info=True)
91-
raise HTTPException(
92-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
93-
detail=f"failed to invoke plugin: {e}",
94-
)
105+
return await wrap_fn(
106+
func=func,
107+
)
95108

96109
class SchemaOutputResponse(BaseModel):
97110
inputs: dict[str, Any]

unstructured_platform_plugins/etl_uvicorn/utils.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@ def get_plugin_id(instance: Any, method_name: Optional[str] = None) -> str:
5353
return ref_id
5454

5555

56-
def get_input_schema(func: Callable) -> dict:
56+
def get_input_schema(func: Callable, omit: Optional[list[str]] = None) -> dict:
57+
5758
parameters = get_typed_parameters(func)
59+
if omit:
60+
parameters = [p for p in parameters if p.name not in omit]
5861
return parameters_to_json_schema(parameters)
5962

6063

@@ -70,9 +73,9 @@ def get_output_schema(func: Callable) -> dict:
7073
return response_to_json_schema(get_output_sig(func))
7174

7275

73-
def get_schema_dict(func) -> dict:
76+
def get_schema_dict(func, omit: list[str] = ["usage"]) -> dict:
7477
return {
75-
"inputs": get_input_schema(func),
78+
"inputs": get_input_schema(func, omit=omit),
7679
"outputs": get_output_schema(func),
7780
}
7881

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from pydantic import BaseModel
2+
3+
4+
# This is a stand in until a supported schema is published external to this repo
5+
class UsageData(BaseModel):
6+
value: int
7+
name: str

0 commit comments

Comments
 (0)