Skip to content

feat/add support for arbitrary messages #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.0.24

* **Add support for passing messages back other than errors**

## 0.0.23

* **Handle errors in streaming responses**
Expand Down
2 changes: 1 addition & 1 deletion unstructured_platform_plugins/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.23" # pragma: no cover
__version__ = "0.0.24" # pragma: no cover
14 changes: 14 additions & 0 deletions unstructured_platform_plugins/etl_uvicorn/api_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class EtlApiException(Exception):
logger = logging.getLogger("uvicorn.error")


class MessageChannels(BaseModel):
infos: list[str] = Field(default_factory=list)
warnings: list[str] = Field(default_factory=list)


def log_func_and_body(func: Callable, body: Optional[str] = None) -> None:
msg = None
if logger.level == LOG_LEVELS.get("debug", logging.NOTSET):
Expand Down Expand Up @@ -135,6 +140,7 @@ class InvokeResponse(BaseModel):
filedata_meta: Optional[filedata_meta_model]
status_code_text: Optional[str] = None
output: Optional[response_type] = None
message_channels: MessageChannels = Field(default_factory=MessageChannels)

input_schema = get_input_schema(func, omit=["usage", "filedata_meta"])
input_schema_model = schema_to_base_model(input_schema)
Expand All @@ -146,11 +152,14 @@ class InvokeResponse(BaseModel):
async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> ResponseType:
usage: list[UsageData] = []
filedata_meta = FileDataMeta()
message_channels = MessageChannels()
request_dict = kwargs if kwargs else {}
if "usage" in inspect.signature(func).parameters:
request_dict["usage"] = usage
else:
logger.warning("usage data not an expected parameter, omitting")
if "message_channels" in inspect.signature(func).parameters:
request_dict["message_channels"] = message_channels
if "filedata_meta" in inspect.signature(func).parameters:
request_dict["filedata_meta"] = filedata_meta
try:
Expand All @@ -161,6 +170,7 @@ async def _stream_response():
async for output in func(**(request_dict or {})):
yield InvokeResponse(
usage=usage,
message_channels=message_channels,
filedata_meta=filedata_meta_model.model_validate(
filedata_meta.model_dump()
),
Expand All @@ -171,6 +181,7 @@ async def _stream_response():
logger.error(f"Failure streaming response: {e}", exc_info=True)
yield InvokeResponse(
usage=usage,
message_channels=message_channels,
filedata_meta=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
status_code_text=f"[{e.__class__.__name__}] {e}",
Expand All @@ -181,6 +192,7 @@ async def _stream_response():
output = await invoke_func(func=func, kwargs=request_dict)
return InvokeResponse(
usage=usage,
message_channels=message_channels,
filedata_meta=filedata_meta_model.model_validate(filedata_meta.model_dump()),
status_code=status.HTTP_200_OK,
output=output,
Expand All @@ -189,6 +201,7 @@ async def _stream_response():
logger.info("Unrecoverable error occurred during plugin invocation")
return InvokeResponse(
usage=usage,
message_channels=message_channels,
status_code=512,
status_code_text=ex.message,
filedata_meta=filedata_meta_model.model_validate(filedata_meta.model_dump()),
Expand All @@ -198,6 +211,7 @@ async def _stream_response():
http_error = wrap_error(invoke_error)
return InvokeResponse(
usage=usage,
message_channels=message_channels,
filedata_meta=filedata_meta_model.model_validate(filedata_meta.model_dump()),
status_code=http_error.status_code,
status_code_text=f"[{invoke_error.__class__.__name__}] {invoke_error}",
Expand Down