From 2391c6ff72233bca18c5cbf5a7783209d5b101bf Mon Sep 17 00:00:00 2001 From: vangheem Date: Wed, 25 Sep 2024 10:53:27 -0400 Subject: [PATCH 1/9] streaming invoke responses poc --- .../etl_uvicorn/api_generator.py | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/unstructured_platform_plugins/etl_uvicorn/api_generator.py b/unstructured_platform_plugins/etl_uvicorn/api_generator.py index 9df5f4d..2815fbf 100644 --- a/unstructured_platform_plugins/etl_uvicorn/api_generator.py +++ b/unstructured_platform_plugins/etl_uvicorn/api_generator.py @@ -1,10 +1,12 @@ import asyncio +from fastapi.responses import StreamingResponse import hashlib import inspect import json import logging from functools import partial from typing import Any, Callable, Optional +import inspect from fastapi import FastAPI, status from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor @@ -26,6 +28,7 @@ schema_to_base_model, ) from unstructured_platform_plugins.schema.usage import UsageData +import concurrent.futures logger = logging.getLogger("uvicorn.error") @@ -46,12 +49,29 @@ def log_func_and_body(func: Callable, body: Optional[str] = None) -> None: logger.log(level=logger.level, msg=msg) +async def run_generator_in_executor(generator_func, **kwargs): + loop = asyncio.get_event_loop() + # Create a future that will yield the generator values one by one + gen = generator_func(**kwargs) + while True: + result = await loop.run_in_executor(None, next, gen, None) + if result is None: + break + yield result + + async def invoke_func(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> Any: kwargs = kwargs or {} - if inspect.iscoroutinefunction(func): - return await func(**kwargs) + if inspect.isasyncgenfunction(func): + async for val in func(**kwargs): + yield val + elif inspect.isgeneratorfunction(func): + async for val in run_generator_in_executor(func, **kwargs): + yield val + elif inspect.iscoroutinefunction(func): + yield await func(**kwargs) else: - return await asyncio.get_event_loop().run_in_executor(None, partial(func, **kwargs)) + yield await asyncio.get_event_loop().run_in_executor(None, partial(func, **kwargs)) def check_precheck_func(precheck_func: Callable): @@ -95,7 +115,7 @@ def generate_fast_api( logger.debug(f"set static id response to: {plugin_id}") - fastapi_app = FastAPI() + fastapi_app = FastAPI(debug=True) response_type = get_output_sig(func) @@ -118,8 +138,14 @@ async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> In else: logger.warning("usage data not an expected parameter, omitting") try: - output = await invoke_func(func=func, kwargs=request_dict) - return InvokeResponse(usage=usage, status_code=status.HTTP_200_OK, output=output) + + async def _stream_response(): + async for output in invoke_func(func=func, kwargs=request_dict): + yield InvokeResponse( + usage=usage, status_code=status.HTTP_200_OK, output=output + ).model_dump_json() + "\n" + + return StreamingResponse(_stream_response(), media_type="application/x-ndjson") except Exception as invoke_error: logger.error(f"failed to invoke plugin: {invoke_error}", exc_info=True) return InvokeResponse( From c747f2e73414777b6dfe91c066d57c0ee00f7dbc Mon Sep 17 00:00:00 2001 From: vangheem Date: Wed, 25 Sep 2024 14:45:11 -0400 Subject: [PATCH 2/9] fix --- unstructured_platform_plugins/etl_uvicorn/api_generator.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/unstructured_platform_plugins/etl_uvicorn/api_generator.py b/unstructured_platform_plugins/etl_uvicorn/api_generator.py index 2815fbf..0e4c449 100644 --- a/unstructured_platform_plugins/etl_uvicorn/api_generator.py +++ b/unstructured_platform_plugins/etl_uvicorn/api_generator.py @@ -1,14 +1,13 @@ import asyncio -from fastapi.responses import StreamingResponse import hashlib import inspect import json import logging from functools import partial from typing import Any, Callable, Optional -import inspect from fastapi import FastAPI, status +from fastapi.responses import StreamingResponse from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from pydantic import BaseModel from starlette.responses import RedirectResponse @@ -28,7 +27,6 @@ schema_to_base_model, ) from unstructured_platform_plugins.schema.usage import UsageData -import concurrent.futures logger = logging.getLogger("uvicorn.error") From f2172e0b6b724fc840c5f4a61ff1a78f4d2ea825 Mon Sep 17 00:00:00 2001 From: vangheem Date: Wed, 25 Sep 2024 14:45:33 -0400 Subject: [PATCH 3/9] nope --- unstructured_platform_plugins/etl_uvicorn/api_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unstructured_platform_plugins/etl_uvicorn/api_generator.py b/unstructured_platform_plugins/etl_uvicorn/api_generator.py index 0e4c449..296ec46 100644 --- a/unstructured_platform_plugins/etl_uvicorn/api_generator.py +++ b/unstructured_platform_plugins/etl_uvicorn/api_generator.py @@ -113,7 +113,7 @@ def generate_fast_api( logger.debug(f"set static id response to: {plugin_id}") - fastapi_app = FastAPI(debug=True) + fastapi_app = FastAPI() response_type = get_output_sig(func) From 263fc28dc26da3c3b32f1a7613819c79ae9cd9cf Mon Sep 17 00:00:00 2001 From: vangheem Date: Wed, 25 Sep 2024 14:47:35 -0400 Subject: [PATCH 4/9] bump --- CHANGELOG.md | 4 ++++ unstructured_platform_plugins/__version__.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0342fcd..3dd3e24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.0.6 + +* **Support streaming response types for /invoke** + ## 0.0.5 * **Improve logging to hide body in case of sensitive data unless TRACE level** diff --git a/unstructured_platform_plugins/__version__.py b/unstructured_platform_plugins/__version__.py index cec386a..62c73e6 100644 --- a/unstructured_platform_plugins/__version__.py +++ b/unstructured_platform_plugins/__version__.py @@ -1 +1 @@ -__version__ = "0.0.5" # pragma: no cover +__version__ = "0.0.6" # pragma: no cover From 4ecb6b8ed6b0c89ebf2953fd87109bebc40ef1c6 Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 26 Sep 2024 09:04:27 -0400 Subject: [PATCH 5/9] better --- CHANGELOG.md | 2 +- .../etl_uvicorn/api_generator.py | 45 +++++++++---------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dd3e24..98cc5d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## 0.0.6 -* **Support streaming response types for /invoke** +* **Support streaming response types for /invoke if callable is async generator** ## 0.0.5 diff --git a/unstructured_platform_plugins/etl_uvicorn/api_generator.py b/unstructured_platform_plugins/etl_uvicorn/api_generator.py index 296ec46..11536f6 100644 --- a/unstructured_platform_plugins/etl_uvicorn/api_generator.py +++ b/unstructured_platform_plugins/etl_uvicorn/api_generator.py @@ -47,26 +47,15 @@ def log_func_and_body(func: Callable, body: Optional[str] = None) -> None: logger.log(level=logger.level, msg=msg) -async def run_generator_in_executor(generator_func, **kwargs): - loop = asyncio.get_event_loop() - # Create a future that will yield the generator values one by one - gen = generator_func(**kwargs) - while True: - result = await loop.run_in_executor(None, next, gen, None) - if result is None: - break - yield result +async def invoke_async_gen_func(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> Any: + kwargs = kwargs or {} + async for val in func(**kwargs): + yield val async def invoke_func(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> Any: kwargs = kwargs or {} - if inspect.isasyncgenfunction(func): - async for val in func(**kwargs): - yield val - elif inspect.isgeneratorfunction(func): - async for val in run_generator_in_executor(func, **kwargs): - yield val - elif inspect.iscoroutinefunction(func): + if inspect.iscoroutinefunction(func): yield await func(**kwargs) else: yield await asyncio.get_event_loop().run_in_executor(None, partial(func, **kwargs)) @@ -136,14 +125,22 @@ async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> In else: logger.warning("usage data not an expected parameter, omitting") try: - - async def _stream_response(): - async for output in invoke_func(func=func, kwargs=request_dict): - yield InvokeResponse( - usage=usage, status_code=status.HTTP_200_OK, output=output - ).model_dump_json() + "\n" - - return StreamingResponse(_stream_response(), media_type="application/x-ndjson") + if inspect.isasyncgenfunction(func): + # Stream response if function is an async generator + + async def _stream_response(): + async for output in invoke_async_gen_func(func=func, kwargs=request_dict): + yield InvokeResponse( + usage=usage, status_code=status.HTTP_200_OK, output=output + ).model_dump_json() + "\n" + + return StreamingResponse(_stream_response(), media_type="application/x-ndjson") + else: + return InvokeResponse( + usage=usage, + status_code=status.HTTP_200_OK, + output=await invoke_func(func, request_dict), + ) except Exception as invoke_error: logger.error(f"failed to invoke plugin: {invoke_error}", exc_info=True) return InvokeResponse( From b2f0dbc408bed9303a8efecbef175633a684124f Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 26 Sep 2024 09:04:55 -0400 Subject: [PATCH 6/9] fix --- unstructured_platform_plugins/etl_uvicorn/api_generator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unstructured_platform_plugins/etl_uvicorn/api_generator.py b/unstructured_platform_plugins/etl_uvicorn/api_generator.py index 11536f6..c5b9f47 100644 --- a/unstructured_platform_plugins/etl_uvicorn/api_generator.py +++ b/unstructured_platform_plugins/etl_uvicorn/api_generator.py @@ -56,9 +56,9 @@ async def invoke_async_gen_func(func: Callable, kwargs: Optional[dict[str, Any]] async def invoke_func(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> Any: kwargs = kwargs or {} if inspect.iscoroutinefunction(func): - yield await func(**kwargs) + return await func(**kwargs) else: - yield await asyncio.get_event_loop().run_in_executor(None, partial(func, **kwargs)) + return await asyncio.get_event_loop().run_in_executor(None, partial(func, **kwargs)) def check_precheck_func(precheck_func: Callable): From 6341f44835f397fe211ab6398fdb332c0716e02d Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 26 Sep 2024 09:05:29 -0400 Subject: [PATCH 7/9] format --- unstructured_platform_plugins/etl_uvicorn/api_generator.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/unstructured_platform_plugins/etl_uvicorn/api_generator.py b/unstructured_platform_plugins/etl_uvicorn/api_generator.py index c5b9f47..dd392c1 100644 --- a/unstructured_platform_plugins/etl_uvicorn/api_generator.py +++ b/unstructured_platform_plugins/etl_uvicorn/api_generator.py @@ -136,11 +136,8 @@ async def _stream_response(): return StreamingResponse(_stream_response(), media_type="application/x-ndjson") else: - return InvokeResponse( - usage=usage, - status_code=status.HTTP_200_OK, - output=await invoke_func(func, request_dict), - ) + output = await invoke_func(func=func, kwargs=request_dict) + return InvokeResponse(usage=usage, status_code=status.HTTP_200_OK, output=output) except Exception as invoke_error: logger.error(f"failed to invoke plugin: {invoke_error}", exc_info=True) return InvokeResponse( From fceaadaaee2adcbfe473f0682427d43298b9315d Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 26 Sep 2024 09:07:02 -0400 Subject: [PATCH 8/9] better --- .../etl_uvicorn/api_generator.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/unstructured_platform_plugins/etl_uvicorn/api_generator.py b/unstructured_platform_plugins/etl_uvicorn/api_generator.py index dd392c1..e5db8d7 100644 --- a/unstructured_platform_plugins/etl_uvicorn/api_generator.py +++ b/unstructured_platform_plugins/etl_uvicorn/api_generator.py @@ -47,12 +47,6 @@ def log_func_and_body(func: Callable, body: Optional[str] = None) -> None: logger.log(level=logger.level, msg=msg) -async def invoke_async_gen_func(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> Any: - kwargs = kwargs or {} - async for val in func(**kwargs): - yield val - - async def invoke_func(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> Any: kwargs = kwargs or {} if inspect.iscoroutinefunction(func): @@ -129,7 +123,7 @@ async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> In # Stream response if function is an async generator async def _stream_response(): - async for output in invoke_async_gen_func(func=func, kwargs=request_dict): + async for output in func(**(request_dict or {})): yield InvokeResponse( usage=usage, status_code=status.HTTP_200_OK, output=output ).model_dump_json() + "\n" From ce09f936b15f4cd90f39e446b6fbea1a9fef7ba5 Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 26 Sep 2024 10:21:18 -0400 Subject: [PATCH 9/9] typing --- .../etl_uvicorn/api_generator.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/unstructured_platform_plugins/etl_uvicorn/api_generator.py b/unstructured_platform_plugins/etl_uvicorn/api_generator.py index e5db8d7..c19cbb2 100644 --- a/unstructured_platform_plugins/etl_uvicorn/api_generator.py +++ b/unstructured_platform_plugins/etl_uvicorn/api_generator.py @@ -111,7 +111,9 @@ class InvokeResponse(BaseModel): logging.getLogger("etl_uvicorn.fastapi") - async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> InvokeResponse: + ResponseType = StreamingResponse if inspect.isasyncgenfunction(func) else InvokeResponse + + async def wrap_fn(func: Callable, kwargs: Optional[dict[str, Any]] = None) -> ResponseType: usage: list[UsageData] = [] request_dict = kwargs if kwargs else {} if "usage" in inspect.signature(func).parameters: @@ -144,7 +146,7 @@ async def _stream_response(): if input_schema_model.model_fields: @fastapi_app.post("/invoke", response_model=InvokeResponse) - async def run_job(request: input_schema_model) -> InvokeResponse: + async def run_job(request: input_schema_model) -> ResponseType: log_func_and_body(func=func, body=request.json()) # Create dictionary from pydantic model while preserving underlying types request_dict = {f: getattr(request, f) for f in request.model_fields} @@ -156,7 +158,7 @@ async def run_job(request: input_schema_model) -> InvokeResponse: else: @fastapi_app.post("/invoke", response_model=InvokeResponse) - async def run_job() -> InvokeResponse: + async def run_job() -> ResponseType: log_func_and_body(func=func) return await wrap_fn( func=func,