Empty workflow run output, or: how to get the task output of a workflow run? #1989
-
Hi! first of all, many thanks for this great project! I'm evaluating it with a simple example that mimics most of my intended usage (for now): basically a REST API to start tasks and retrieve their status and result. I can successfully start a task as instructed by the user guide, and I can retrieve the status of the job via My question is: what is the cleanest option to get the result? I still have to go through the task orchestration part, but I assume that the output because a workflow run can be composed by multiple tasks, so I suppose I cannot rely on the My worker is set up like this: import asyncio
from hatchet_sdk import Context
from hatchet_sdk import Hatchet
from pydantic import BaseModel
class FirstTaskInput(BaseModel):
message: str
delay: int
hatchet = Hatchet(debug=True)
@hatchet.task(name="FirstTask", input_validator=FirstTaskInput)
async def first_task(input: FirstTaskInput, ctx: Context) -> dict[str, str]:
await asyncio.sleep(input.delay)
return {"message": input.message}
def main() -> None:
worker = hatchet.worker("test-worker", workflows=[first_task])
worker.start()
if __name__ == "__main__":
main() and my FastAPI server is like this: from typing import Any
import uvicorn
from fastapi import FastAPI
from fastapi.responses import RedirectResponse
from hatchet_sdk import V1TaskStatus
from worker import FirstTaskInput
from worker import first_task
from worker import hatchet
app = FastAPI()
@app.post("/task")
async def start_task(input: FirstTaskInput):
ref = await first_task.aio_run_no_wait(input=input)
return RedirectResponse(f"/task/{ref.workflow_run_id}", status_code=302)
@app.get("/task/{task_id}")
async def get_task(task_id: str) -> dict[str, Any]:
status = await hatchet.runs.aio_get_status(task_id)
if status == V1TaskStatus.COMPLETED:
return await hatchet.runs.aio_get_result(task_id)
return {"status": status.value}
def main():
"""Run the application."""
uvicorn.run(
"my_project_api:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="debug",
)
if __name__ == "__main__":
main() |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 6 replies
-
For completeness: replacing the run = await hatchet.runs.aio_get(task_id)
return run.tasks[-1].output While this will be fine for many of my use cases, I'm still dubious if this it the right thing to do... |
Beta Was this translation helpful? Give feedback.
-
hey @sanzoghenzo - I think the best thing to do here would be to construct a Let me know if that helps! |
Beta Was this translation helpful? Give feedback.
yes, definitely - in that case, it definitely makes sense to return the ID and then poll for the result! What you're doing should work, this definitely looks like a bug on our end on the REST API. Fortunately, the gRPC API should help here and will be less prone to race conditions, which also pop up in the REST API. Here's an example based on what you have now