diff --git a/pyproject.toml b/pyproject.toml index 44430e0..2795cdc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-llamaindex" -version = "0.0.19" +version = "0.0.20" description = "UiPath LlamaIndex SDK" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.10" diff --git a/samples/action-center-hitl-agent/.env.example b/samples/action-center-hitl-agent/.env.example new file mode 100644 index 0000000..36643c0 --- /dev/null +++ b/samples/action-center-hitl-agent/.env.example @@ -0,0 +1 @@ +OPENAI_API_KEY=xxx diff --git a/samples/action-center-hitl-agent/agent.mermaid b/samples/action-center-hitl-agent/agent.mermaid new file mode 100644 index 0000000..18ba266 --- /dev/null +++ b/samples/action-center-hitl-agent/agent.mermaid @@ -0,0 +1,35 @@ +flowchart TD + step__done["_done"]:::stepStyle + step_aggregate_tool_results["aggregate_tool_results"]:::stepStyle + step_call_tool["call_tool"]:::stepStyle + step_init_run["init_run"]:::stepStyle + step_parse_agent_output["parse_agent_output"]:::stepStyle + step_run_agent_step["run_agent_step"]:::stepStyle + step_setup_agent["setup_agent"]:::stepStyle + event_StopEvent([

StopEvent

]):::stopEventStyle + event_ToolCallResult([

ToolCallResult

]):::defaultEventStyle + event_AgentInput([

AgentInput

]):::defaultEventStyle + event_ToolCall([

ToolCall

]):::defaultEventStyle + event_AgentWorkflowStartEvent([

AgentWorkflowStartEvent

]):::defaultEventStyle + event_AgentOutput([

AgentOutput

]):::defaultEventStyle + event_AgentSetup([

AgentSetup

]):::defaultEventStyle + event_StopEvent --> step__done + step_aggregate_tool_results --> event_AgentInput + step_aggregate_tool_results --> event_StopEvent + event_ToolCallResult --> step_aggregate_tool_results + step_call_tool --> event_ToolCallResult + event_ToolCall --> step_call_tool + step_init_run --> event_AgentInput + event_AgentWorkflowStartEvent --> step_init_run + step_parse_agent_output --> event_StopEvent + step_parse_agent_output --> event_ToolCall + event_AgentOutput --> step_parse_agent_output + step_run_agent_step --> event_AgentOutput + event_AgentSetup --> step_run_agent_step + step_setup_agent --> event_AgentSetup + event_AgentInput --> step_setup_agent + classDef stepStyle fill:#f2f0ff,line-height:1.2 + classDef externalStyle fill:#f2f0ff,line-height:1.2 + classDef defaultEventStyle fill-opacity:0 + classDef stopEventStyle fill:#bfb6fc + classDef inputRequiredStyle fill:#f2f0ff,line-height:1.2 \ No newline at end of file diff --git a/samples/action-center-hitl-agent/llama_index.json b/samples/action-center-hitl-agent/llama_index.json new file mode 100644 index 0000000..c088db6 --- /dev/null +++ b/samples/action-center-hitl-agent/llama_index.json @@ -0,0 +1,7 @@ +{ + "dependencies": ["."], + "workflows": { + "agent": "main.py:workflow" + }, + "env": ".env" +} diff --git a/samples/action-center-hitl-agent/main.py b/samples/action-center-hitl-agent/main.py new file mode 100644 index 0000000..be96195 --- /dev/null +++ b/samples/action-center-hitl-agent/main.py @@ -0,0 +1,58 @@ +import json + +from dotenv import load_dotenv +from llama_index.core.agent.workflow import AgentWorkflow +from llama_index.core.workflow import ( + Context, + HumanResponseEvent, +) +from llama_index.llms.openai import OpenAI + +from uipath_llamaindex.models import CreateActionEvent + +load_dotenv() + +llm = OpenAI(model="gpt-4o-mini") + + +async def may_research_company(ctx: Context, company_name: str) -> bool: + """Find whether a company may be researched. + Args: + ctx (Context): The context in which this function is called (autopopulated). + company_name (str): Name of the company to be researched. + Returns: + bool: True if the company can be researched, False otherwise. + """ + # emit an event to the external stream to be captured + agent_name = "Company researcher" + ctx.write_event_to_stream( + CreateActionEvent( + prefix="hitl escalation to research company", + app_name="generic_escalation_app", + title=f"Action required for {agent_name}", + data={ + "AgentOutput": (f"May I perform a research on company {company_name}?"), + "AgentName": agent_name, + }, + app_version=1, + app_folder_path="Shared", + # assignee="(optional)" + ) + ) + + # wait until we see a HumanResponseEvent + hitl_response = await ctx.wait_for_event(HumanResponseEvent) + feedback = json.loads(hitl_response.response) + # act on the input from the event + if isinstance(feedback["Answer"], bool) and feedback["Answer"] is True: + return True + else: + return False + + +# example user input {"user_msg": "research Uipath company"} +workflow = AgentWorkflow.from_tools_or_functions( + [may_research_company()], + llm=llm, + system_prompt="You are a helpful assistant that can use tools to perform actions requested by user", +) diff --git a/samples/action-center-hitl-agent/pyproject.toml b/samples/action-center-hitl-agent/pyproject.toml new file mode 100644 index 0000000..fc4ef6a --- /dev/null +++ b/samples/action-center-hitl-agent/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "llama-action-center" +version = "0.0.1" +description = "UiPath LlamaIndex Simple HITL Agent" +authors = [{ name = "John Doe", email = "john.doe@myemail.com" }] +readme = { file = "README.md", content-type = "text/markdown" } +requires-python = ">=3.10" +dependencies = [ + "uipath-llamaindex==0.0.20", + "llama-index-llms-openai>=0.2.2" +] diff --git a/samples/action-center-hitl-agent/uipath.json b/samples/action-center-hitl-agent/uipath.json new file mode 100644 index 0000000..f4aa963 --- /dev/null +++ b/samples/action-center-hitl-agent/uipath.json @@ -0,0 +1,27 @@ +{ + "entryPoints": [ + { + "filePath": "agent", + "uniqueId": "ca9074cb-d758-483c-a88d-4346b3214fb7", + "type": "agent", + "input": { + "type": "object", + "properties": { + "hack": { + "type": "object" + } + }, + "required": [] + }, + "output": { + "type": "object", + "properties": {}, + "required": [] + } + } + ], + "bindings": { + "version": "2.0", + "resources": [] + } +} diff --git a/samples/multi-agent/.env.example b/samples/multi-agent/.env.example new file mode 100644 index 0000000..36643c0 --- /dev/null +++ b/samples/multi-agent/.env.example @@ -0,0 +1 @@ +OPENAI_API_KEY=xxx diff --git a/samples/multi-agent/agent.mermaid b/samples/multi-agent/agent.mermaid new file mode 100644 index 0000000..18ba266 --- /dev/null +++ b/samples/multi-agent/agent.mermaid @@ -0,0 +1,35 @@ +flowchart TD + step__done["_done"]:::stepStyle + step_aggregate_tool_results["aggregate_tool_results"]:::stepStyle + step_call_tool["call_tool"]:::stepStyle + step_init_run["init_run"]:::stepStyle + step_parse_agent_output["parse_agent_output"]:::stepStyle + step_run_agent_step["run_agent_step"]:::stepStyle + step_setup_agent["setup_agent"]:::stepStyle + event_StopEvent([

StopEvent

]):::stopEventStyle + event_ToolCallResult([

ToolCallResult

]):::defaultEventStyle + event_AgentInput([

AgentInput

]):::defaultEventStyle + event_ToolCall([

ToolCall

]):::defaultEventStyle + event_AgentWorkflowStartEvent([

AgentWorkflowStartEvent

]):::defaultEventStyle + event_AgentOutput([

AgentOutput

]):::defaultEventStyle + event_AgentSetup([

AgentSetup

]):::defaultEventStyle + event_StopEvent --> step__done + step_aggregate_tool_results --> event_AgentInput + step_aggregate_tool_results --> event_StopEvent + event_ToolCallResult --> step_aggregate_tool_results + step_call_tool --> event_ToolCallResult + event_ToolCall --> step_call_tool + step_init_run --> event_AgentInput + event_AgentWorkflowStartEvent --> step_init_run + step_parse_agent_output --> event_StopEvent + step_parse_agent_output --> event_ToolCall + event_AgentOutput --> step_parse_agent_output + step_run_agent_step --> event_AgentOutput + event_AgentSetup --> step_run_agent_step + step_setup_agent --> event_AgentSetup + event_AgentInput --> step_setup_agent + classDef stepStyle fill:#f2f0ff,line-height:1.2 + classDef externalStyle fill:#f2f0ff,line-height:1.2 + classDef defaultEventStyle fill-opacity:0 + classDef stopEventStyle fill:#bfb6fc + classDef inputRequiredStyle fill:#f2f0ff,line-height:1.2 \ No newline at end of file diff --git a/samples/multi-agent/llama_index.json b/samples/multi-agent/llama_index.json new file mode 100644 index 0000000..c088db6 --- /dev/null +++ b/samples/multi-agent/llama_index.json @@ -0,0 +1,7 @@ +{ + "dependencies": ["."], + "workflows": { + "agent": "main.py:workflow" + }, + "env": ".env" +} diff --git a/samples/multi-agent/main.py b/samples/multi-agent/main.py new file mode 100644 index 0000000..d6702e1 --- /dev/null +++ b/samples/multi-agent/main.py @@ -0,0 +1,49 @@ +import json + +from dotenv import load_dotenv +from llama_index.core.agent.workflow import AgentWorkflow +from llama_index.core.workflow import ( + Context, + HumanResponseEvent, +) +from llama_index.llms.openai import OpenAI + +from uipath_llamaindex.models import InvokeProcessEvent + +load_dotenv() + +llm = OpenAI(model="gpt-4o-mini") + + +async def may_research_company(ctx: Context, company_name: str) -> str: + """Find whether a company may be researcher. + Args: + ctx (Context): The context in which this function is called (autopopulated). + company_name (str): Name of the company to be researched. + Returns: + str: company report + """ + # emit an event to the external stream to be captured + ctx.write_event_to_stream( + InvokeProcessEvent( + prefix="invoke langgraph researcher event", + name="my-first-uipath-agent", + # process_folder_path="(optional)", + input_arguments={ + "topic": company_name, + }, + ) + ) + + # wait until we see a HumanResponseEvent + hitl_response = await ctx.wait_for_event(HumanResponseEvent) + feedback = json.loads(hitl_response.response) + # act on the input from the event + return feedback["report"] + + +workflow = AgentWorkflow.from_tools_or_functions( + [may_research_company], + llm=llm, + system_prompt="You are a helpful assistant that can decide whether a company can be researched or not.", +) diff --git a/samples/multi-agent/pyproject.toml b/samples/multi-agent/pyproject.toml new file mode 100644 index 0000000..2a1d3e3 --- /dev/null +++ b/samples/multi-agent/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "llama-multi-agent" +version = "0.0.1" +description = "UiPath LlamaIndex Simple HITL Agent" +authors = [{ name = "John Doe", email = "john.doe@myemail.com" }] +readme = { file = "README.md", content-type = "text/markdown" } +requires-python = ">=3.10" +dependencies = [ + "uipath-llamaindex==0.0.20", + "llama-index-llms-openai>=0.2.2" +] diff --git a/samples/multi-agent/uipath.json b/samples/multi-agent/uipath.json new file mode 100644 index 0000000..f078efc --- /dev/null +++ b/samples/multi-agent/uipath.json @@ -0,0 +1,27 @@ +{ + "entryPoints": [ + { + "filePath": "agent", + "uniqueId": "ca9074cb-d758-483c-a88d-4346b3214fb8", + "type": "agent", + "input": { + "type": "object", + "properties": { + "hack": { + "type": "object" + } + }, + "required": [] + }, + "output": { + "type": "object", + "properties": {}, + "required": [] + } + } + ], + "bindings": { + "version": "2.0", + "resources": [] + } +} diff --git a/samples/simple-hitl-agent/main.py b/samples/simple-hitl-agent/main.py index fabbbcc..25d535d 100644 --- a/samples/simple-hitl-agent/main.py +++ b/samples/simple-hitl-agent/main.py @@ -9,13 +9,21 @@ llm = OpenAI(model="gpt-4o-mini") -async def research_company(ctx: Context) -> str: - """Research a company.""" +async def may_research_company(ctx: Context, company_name: str) -> bool: + """Find whether a company may be researched. + Args: + ctx (Context): The context in which this function is called (autopopulated). + company_name (str): Name of the company to be researched. + Returns: + bool: True if the company can be researched, False otherwise. + """ print("Researching company...") # emit an event to the external stream to be captured ctx.write_event_to_stream( - InputRequiredEvent(prefix="Are you sure you want to proceed?") + InputRequiredEvent( + prefix=f"May I perform a research on company {company_name}? \n (yes/no)" + ) ) # wait until we see a HumanResponseEvent @@ -24,13 +32,13 @@ async def research_company(ctx: Context) -> str: # act on the input from the event if response.response.strip().lower() == "yes": - return "Research completed successfully." + return True else: - return "Research task aborted." + return False workflow = AgentWorkflow.from_tools_or_functions( - [research_company], + [may_research_company], llm=llm, - system_prompt="You are a helpful assistant that can research companies.", + system_prompt="You are a helpful assistant that can decide whether a company can be researched or not.", ) diff --git a/samples/simple-hitl-agent/pyproject.toml b/samples/simple-hitl-agent/pyproject.toml index 4461e02..7d5ed85 100644 --- a/samples/simple-hitl-agent/pyproject.toml +++ b/samples/simple-hitl-agent/pyproject.toml @@ -1,11 +1,11 @@ [project] name = "llama-simple-hitl-agent" -version = "0.0.8" +version = "0.0.1" description = "UiPath LlamaIndex Simple HITL Agent" authors = [{ name = "John Doe", email = "john.doe@myemail.com" }] readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.10" dependencies = [ - "uipath-llamaindex>=0.0.18", + "uipath-llamaindex==0.0.20", "llama-index-llms-openai>=0.2.2" ] diff --git a/samples/simple-hitl-agent/uipath.json b/samples/simple-hitl-agent/uipath.json index 0632247..f4aa963 100644 --- a/samples/simple-hitl-agent/uipath.json +++ b/samples/simple-hitl-agent/uipath.json @@ -2,7 +2,7 @@ "entryPoints": [ { "filePath": "agent", - "uniqueId": "ca9074cb-d758-483c-a88d-4346b3214fb6", + "uniqueId": "ca9074cb-d758-483c-a88d-4346b3214fb7", "type": "agent", "input": { "type": "object", diff --git a/samples/simple-hitl-agent/uv.lock b/samples/simple-hitl-agent/uv.lock index a0b095c..e223a57 100644 --- a/samples/simple-hitl-agent/uv.lock +++ b/samples/simple-hitl-agent/uv.lock @@ -1110,20 +1110,8 @@ wheels = [ ] [[package]] -name = "llama-parse" -version = "0.6.23" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "llama-cloud-services" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/11/30/8170f31a139314479dafea5e2140405e5127207fbc95056aedee01d6b7fd/llama_parse-0.6.23.tar.gz", hash = "sha256:ee575d12660de57264900ae414e7c61646db9fbdd030fd795fb4ce4d28d83f85", size = 3537 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/cd/53/a7e06181f36eaff6c7176e33644db2f19a7761fc9110f0a5d13759ce94ac/llama_parse-0.6.23-py3-none-any.whl", hash = "sha256:8290b08c28fb6cec17b22df7bf37678bbf4f76e8864eccefdbe5d4c04b0e994a", size = 4944 }, -] - -[[package]] -name = "llama-simple-hitl-agent" -version = "0.0.7" +name = "llama-new" +version = "0.0.2" source = { virtual = "." } dependencies = [ { name = "llama-index-llms-openai" }, @@ -1133,7 +1121,19 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "llama-index-llms-openai", specifier = ">=0.2.2" }, - { name = "uipath-llamaindex", specifier = ">=0.0.17" }, + { name = "uipath-llamaindex", specifier = "==0.0.20.dev1000220026", index = "https://test.pypi.org/simple/" }, +] + +[[package]] +name = "llama-parse" +version = "0.6.23" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "llama-cloud-services" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/11/30/8170f31a139314479dafea5e2140405e5127207fbc95056aedee01d6b7fd/llama_parse-0.6.23.tar.gz", hash = "sha256:ee575d12660de57264900ae414e7c61646db9fbdd030fd795fb4ce4d28d83f85", size = 3537 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cd/53/a7e06181f36eaff6c7176e33644db2f19a7761fc9110f0a5d13759ce94ac/llama_parse-0.6.23-py3-none-any.whl", hash = "sha256:8290b08c28fb6cec17b22df7bf37678bbf4f76e8864eccefdbe5d4c04b0e994a", size = 4944 }, ] [[package]] @@ -2605,16 +2605,16 @@ wheels = [ [[package]] name = "uipath-llamaindex" -version = "0.0.17" -source = { registry = "https://pypi.org/simple" } +version = "0.0.20.dev1000220026" +source = { registry = "https://test.pypi.org/simple/" } dependencies = [ { name = "llama-index" }, { name = "openinference-instrumentation-llama-index" }, { name = "uipath" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/cf/e3/01e45a09b47cb8419c01146c99d71c806afa44612107dd504a2011f1be4c/uipath_llamaindex-0.0.17.tar.gz", hash = "sha256:ac646a2023ccef76f52bdd3e1f37323dc4cc5d7f16f891815859960c693d57ad", size = 589828 } +sdist = { url = "https://test-files.pythonhosted.org/packages/98/ce/6d3c35c57148a6cd9495c9311a7958cf0ab3a7c8e3a658d394d12c76fca7/uipath_llamaindex-0.0.20.dev1000220026.tar.gz", hash = "sha256:60644f798277b4008d4b00c9c8c885f6b8cc2d55c2d296ee5f82d763e60e7455", size = 590673 } wheels = [ - { url = "https://files.pythonhosted.org/packages/97/ea/b62a4b9a4df06db0b8518365ca090acc990947250eb3b4c7fb0f89c2df25/uipath_llamaindex-0.0.17-py3-none-any.whl", hash = "sha256:7bd1227d5e0216c7349bdaf819cb7168cfe0f6cc332462535d4b098629615186", size = 16550 }, + { url = "https://test-files.pythonhosted.org/packages/80/c7/47c37cc4162d90e7be67123af7d101680629003f27284ba8a6190aa87334/uipath_llamaindex-0.0.20.dev1000220026-py3-none-any.whl", hash = "sha256:9a3ba88bf8003a8aeb2a0440e34b7cc7c1a571b957c599927149c605e3aa58e6", size = 19396 }, ] [[package]] diff --git a/src/uipath_llamaindex/_cli/_runtime/_hitl.py b/src/uipath_llamaindex/_cli/_runtime/_hitl.py new file mode 100644 index 0000000..f2cce4e --- /dev/null +++ b/src/uipath_llamaindex/_cli/_runtime/_hitl.py @@ -0,0 +1,194 @@ +import json +import uuid +from dataclasses import dataclass +from functools import cached_property +from typing import Any, Optional + +from llama_index.core.workflow import InputRequiredEvent +from uipath import UiPath +from uipath._cli._runtime._contracts import ( + UiPathApiTrigger, + UiPathErrorCategory, + UiPathResumeTrigger, + UiPathResumeTriggerType, + UiPathRuntimeError, + UiPathRuntimeStatus, +) +from uipath.models import CreateAction, InvokeProcess, WaitAction, WaitJob + +uipath = UiPath() + + +def _try_convert_to_json_format(value: str) -> str: + try: + return json.loads(value) + except json.decoder.JSONDecodeError: + return value + + +async def _get_api_payload(inbox_id: str) -> Any: + """ + Fetch payload data for API triggers. + + Args: + inbox_id: The Id of the inbox to fetch the payload for. + + Returns: + The value field from the API response payload, or None if an error occurs. + """ + response = None + try: + response = uipath.api_client.request( + "GET", + f"/orchestrator_/api/JobTriggers/GetPayload/{inbox_id}", + include_folder_headers=True, + ) + data = response.json() + return data.get("payload") + except Exception as e: + raise UiPathRuntimeError( + "API_CONNECTION_ERROR", + "Failed to get trigger payload", + f"Error fetching API trigger payload for inbox {inbox_id}: {str(e)}", + UiPathErrorCategory.SYSTEM, + response.status_code if response else None, + ) from e + + +class HitlReader: + @classmethod + async def read(cls, resume_trigger: UiPathResumeTrigger) -> Optional[str]: + match resume_trigger.trigger_type: + case UiPathResumeTriggerType.ACTION: + if resume_trigger.item_key: + action = await uipath.actions.retrieve_async( + resume_trigger.item_key, + app_folder_key=resume_trigger.folder_key, + app_folder_path=resume_trigger.folder_path, + ) + return action.data + + case UiPathResumeTriggerType.JOB: + if resume_trigger.item_key: + job = await uipath.jobs.retrieve_async( + resume_trigger.item_key, + folder_key=resume_trigger.folder_key, + folder_path=resume_trigger.folder_path, + ) + if ( + job.state + and not job.state.lower() + == UiPathRuntimeStatus.SUCCESSFUL.value.lower() + ): + raise UiPathRuntimeError( + "INVOKED_PROCESS_FAILURE", + "Invoked process did not finish successfully.", + _try_convert_to_json_format(str(job.job_error or job.info)), + ) + return job.output_arguments + + case UiPathResumeTriggerType.API: + if resume_trigger.api_resume.inbox_id: + return await _get_api_payload(resume_trigger.api_resume.inbox_id) + + case _: + raise UiPathRuntimeError( + "UNKNOWN_TRIGGER_TYPE", + "Unexpected trigger type received", + f"Trigger type :{type(resume_trigger.trigger_type)} is invalid", + UiPathErrorCategory.USER, + ) + + raise UiPathRuntimeError( + "HITL_FEEDBACK_FAILURE", + "Failed to receive payload from HITL action", + detail="Failed to receive payload from HITL action", + category=UiPathErrorCategory.SYSTEM, + ) + + +@dataclass +class HitlProcessor: + """Processes events in a Human-(Robot/Agent)-In-The-Loop scenario.""" + + value: Any + + @cached_property + def type(self) -> Optional[UiPathResumeTriggerType]: + """Returns the type of the interrupt value.""" + if isinstance(self.value, CreateAction) or isinstance(self.value, WaitAction): + return UiPathResumeTriggerType.ACTION + if isinstance(self.value, InvokeProcess) or isinstance(self.value, WaitJob): + return UiPathResumeTriggerType.JOB + if isinstance(self.value, InputRequiredEvent): + return UiPathResumeTriggerType.API + return UiPathResumeTriggerType.NONE + + async def create_resume_trigger(self) -> Optional[UiPathResumeTrigger]: + """Returns the resume trigger.""" + try: + hitl_input = self.value + resume_trigger = UiPathResumeTrigger( + triggerType=self.type, interruptObject=hitl_input.model_dump_json() + ) + match self.type: + case UiPathResumeTriggerType.ACTION: + resume_trigger.folder_path = hitl_input.app_folder_path + resume_trigger.folder_key = hitl_input.app_folder_key + if isinstance(hitl_input, WaitAction): + resume_trigger.item_key = hitl_input.action.key + elif isinstance(hitl_input, CreateAction): + action = await uipath.actions.create_async( + title=hitl_input.title, + app_name=hitl_input.app_name if hitl_input.app_name else "", + app_folder_path=hitl_input.app_folder_path + if hitl_input.app_folder_path + else "", + app_folder_key=hitl_input.app_folder_key + if hitl_input.app_folder_key + else "", + app_key=hitl_input.app_key if hitl_input.app_key else "", + app_version=hitl_input.app_version + if hitl_input.app_version + else 1, + assignee=hitl_input.assignee if hitl_input.assignee else "", + data=hitl_input.data, + ) + if action: + resume_trigger.item_key = action.key + + case UiPathResumeTriggerType.JOB: + resume_trigger.folder_path = hitl_input.process_folder_path + resume_trigger.folder_key = hitl_input.process_folder_key + if isinstance(hitl_input, WaitJob): + resume_trigger.item_key = hitl_input.job.key + elif isinstance(hitl_input, InvokeProcess): + job = await uipath.processes.invoke_async( + name=hitl_input.name, + input_arguments=hitl_input.input_arguments, + folder_path=hitl_input.process_folder_path, + folder_key=hitl_input.process_folder_key, + ) + if job: + resume_trigger.item_key = job.key + + case UiPathResumeTriggerType.API: + resume_trigger.api_resume = UiPathApiTrigger( + inboxId=str(uuid.uuid4()), request=hitl_input.prefix + ) + case _: + raise UiPathRuntimeError( + "UNKNOWN_HITL_MODEL", + "Unexpected model received", + f"{type(hitl_input)} is not a valid Human(Robot/Agent)-In-The-Loop model", + UiPathErrorCategory.USER, + ) + except Exception as e: + raise UiPathRuntimeError( + "HITL_ACTION_CREATION_FAILED", + "Failed to create HITL action", + f"{str(e)}", + UiPathErrorCategory.SYSTEM, + ) from e + + return resume_trigger diff --git a/src/uipath_llamaindex/_cli/_runtime/_runtime.py b/src/uipath_llamaindex/_cli/_runtime/_runtime.py index 93024cd..4236b22 100644 --- a/src/uipath_llamaindex/_cli/_runtime/_runtime.py +++ b/src/uipath_llamaindex/_cli/_runtime/_runtime.py @@ -2,9 +2,8 @@ import logging import os import pickle -import uuid from contextlib import suppress -from typing import Any, Optional, cast +from typing import Optional, cast from llama_index.core.workflow import ( Context, @@ -19,7 +18,6 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor from uipath import UiPath from uipath._cli._runtime._contracts import ( - UiPathApiTrigger, UiPathBaseRuntime, UiPathErrorCategory, UiPathResumeTrigger, @@ -30,6 +28,7 @@ from .._tracing._oteladapter import LlamaIndexExporter from ._context import UiPathLlamaIndexRuntimeContext from ._exception import UiPathLlamaIndexRuntimeError +from ._hitl import HitlProcessor, HitlReader logger = logging.getLogger(__name__) @@ -83,11 +82,13 @@ async def execute(self) -> Optional[UiPathRuntimeResult]: **self.context.input_json, ) - resume_trigger: UiPathResumeTrigger = None + resume_trigger: Optional[UiPathResumeTrigger] = None response_applied = False async for event in handler.stream_events(): + # log the received event on trace level if isinstance(event, InputRequiredEvent): + hitl_processor = HitlProcessor(value=event) if self.context.resume and not response_applied: # If we are resuming, we need to apply the response to the event stream. response_applied = True @@ -95,13 +96,8 @@ async def execute(self) -> Optional[UiPathRuntimeResult]: await self.get_response_event() ) else: - resume_trigger = UiPathResumeTrigger( - api_resume=UiPathApiTrigger( - inbox_id=str(uuid.uuid4()), request=event.prefix - ) - ) + resume_trigger = await hitl_processor.create_resume_trigger() break - print(event) if resume_trigger is None: output = await handler @@ -250,7 +246,7 @@ async def load_workflow_context(self): loaded_ctx_dict, serializer=serializer, ) - + # TODO check multiple HITL same agent resumed_trigger_data = loaded_ctx_dict["uipath_resume_trigger"] if resumed_trigger_data: self.context.resumed_trigger = cast( @@ -267,41 +263,15 @@ async def get_response_event(self) -> Optional[HumanResponseEvent]: if self.context.input_json: # If input_json is provided, use it to create a HumanResponseEvent return HumanResponseEvent(**self.context.input_json) - # If resumed_trigger is set, fetch the payload from the API + # If resumed_trigger is set, fetch the feedback if self.context.resumed_trigger: - inbox_id = self.context.resumed_trigger.api_resume.inbox_id - payload = await self._get_api_payload(inbox_id) - if payload: - return HumanResponseEvent(response=payload) + feedback = await HitlReader.read(self.context.resumed_trigger) + if feedback: + if isinstance(feedback, dict): + feedback = json.dumps(feedback) + return HumanResponseEvent(response=feedback) return None - async def _get_api_payload(self, inbox_id: str) -> Any: - """ - Fetch payload data for API triggers. - - Args: - inbox_id: The Id of the inbox to fetch the payload for. - - Returns: - The value field from the API response payload, or None if an error occurs. - """ - try: - response = self._uipath.api_client.request( - "GET", - f"/orchestrator_/api/JobTriggers/GetPayload/{inbox_id}", - include_folder_headers=True, - ) - data = response.json() - return data.get("payload") - except Exception as e: - raise UiPathLlamaIndexRuntimeError( - "API_CONNECTION_ERROR", - "Failed to get trigger payload", - f"Error fetching API trigger payload for inbox {inbox_id}: {str(e)}", - UiPathErrorCategory.SYSTEM, - response.status_code, - ) from e - def _serialize_object(self, obj): """Recursively serializes an object and all its nested components.""" # Handle Pydantic models diff --git a/src/uipath_llamaindex/_cli/_tracing/_oteladapter.py b/src/uipath_llamaindex/_cli/_tracing/_oteladapter.py index af92be4..6aa4a05 100644 --- a/src/uipath_llamaindex/_cli/_tracing/_oteladapter.py +++ b/src/uipath_llamaindex/_cli/_tracing/_oteladapter.py @@ -9,15 +9,15 @@ logger = logging.getLogger(__name__) -class LlamaIndexExporter(LlmOpsHttpExporter): +class LlamaIndexExporter(LlmOpsHttpExporter): # Mapping of old attribute names to new attribute names or (new name, function) ATTRIBUTE_MAPPING = { "input.value": ("input", lambda s: json.loads(s)), "output.value": ("output", lambda s: json.loads(s)), "llm.model_name": "model", } - + # Mapping of span types SPAN_TYPE_MAPPING = { "LLM": "completion", @@ -41,9 +41,11 @@ def _send_with_retries( # Remove the span kind attribute span_type = attributes["openinference.span.kind"] # Map span type using SPAN_TYPE_MAPPING - span_data["SpanType"] = self.SPAN_TYPE_MAPPING.get(span_type, span_type) + span_data["SpanType"] = self.SPAN_TYPE_MAPPING.get( + span_type, span_type + ) del attributes["openinference.span.kind"] - + # Apply the transformation logic for old_key, mapping in self.ATTRIBUTE_MAPPING.items(): if old_key in attributes: @@ -57,14 +59,14 @@ def _send_with_retries( new_key = mapping attributes[new_key] = attributes[old_key] del attributes[old_key] - + # Transform token usage data if present token_keys = { "llm.token_count.prompt": "promptTokens", - "llm.token_count.completion": "completionTokens", - "llm.token_count.total": "totalTokens" + "llm.token_count.completion": "completionTokens", + "llm.token_count.total": "totalTokens", } - + # Check if any token count keys exist if any(key in attributes for key in token_keys): usage = {} @@ -72,27 +74,28 @@ def _send_with_retries( if old_key in attributes: usage[new_key] = attributes[old_key] del attributes[old_key] - + # Add default values for BYO execution fields usage["isByoExecution"] = False # usage["executionDeploymentType"] = "PAYGO" - + # Add usage to attributes attributes["usage"] = usage - + # Clean up any other token count attributes - keys_to_remove = [k for k in attributes if k.startswith("llm.token_count.")] + keys_to_remove = [ + k for k in attributes if k.startswith("llm.token_count.") + ] for key in keys_to_remove: del attributes[key] - + # Convert back to JSON string span_data["attributes"] = json.dumps(attributes) except json.JSONDecodeError as e: logger.warning(f"Failed to parse attributes JSON: {e}") - return super()._send_with_retries( url=url, payload=payload, max_retries=max_retries, - ) \ No newline at end of file + ) diff --git a/src/uipath_llamaindex/_cli/cli_init.py b/src/uipath_llamaindex/_cli/cli_init.py index e4ff548..36ba9a1 100644 --- a/src/uipath_llamaindex/_cli/cli_init.py +++ b/src/uipath_llamaindex/_cli/cli_init.py @@ -124,7 +124,7 @@ def draw_all_possible_flows_mermaid( # Only one kind of `StopEvent` is allowed in a `Workflow`. current_stop_event = None for _, step_func in steps.items(): - step_config : StepConfig = getattr(step_func, "__step_config", None) + step_config: StepConfig = getattr(step_func, "__step_config", None) if step_config is None: continue @@ -176,7 +176,7 @@ def draw_all_possible_flows_mermaid( if event_id not in nodes: nodes.add(event_id) style = get_event_style(event_type) - mermaid_diagram.append(f' {event_id}([

{event_name}

]):::{style}') + mermaid_diagram.append(f" {event_id}([

{event_name}

]):::{style}") if issubclass(event_type, InputRequiredEvent): # Add node for conceptual external step @@ -241,7 +241,9 @@ def draw_all_possible_flows_mermaid( mermaid_diagram.append(" classDef externalStyle fill:#f2f0ff,line-height:1.2") mermaid_diagram.append(" classDef defaultEventStyle fill-opacity:0") mermaid_diagram.append(" classDef stopEventStyle fill:#bfb6fc") - mermaid_diagram.append(" classDef inputRequiredStyle fill:#f2f0ff,line-height:1.2") + mermaid_diagram.append( + " classDef inputRequiredStyle fill:#f2f0ff,line-height:1.2" + ) # Join all lines mermaid_string = "\n".join(mermaid_diagram) diff --git a/src/uipath_llamaindex/models/__init__.py b/src/uipath_llamaindex/models/__init__.py new file mode 100644 index 0000000..1fce6a0 --- /dev/null +++ b/src/uipath_llamaindex/models/__init__.py @@ -0,0 +1,13 @@ +from .hitl_models import ( + CreateActionEvent, + InvokeProcessEvent, + WaitActionEvent, + WaitJobEvent, +) + +__all__ = [ + "CreateActionEvent", + "WaitActionEvent", + "InvokeProcessEvent", + "WaitJobEvent", +] diff --git a/src/uipath_llamaindex/models/hitl_models.py b/src/uipath_llamaindex/models/hitl_models.py new file mode 100644 index 0000000..5c75812 --- /dev/null +++ b/src/uipath_llamaindex/models/hitl_models.py @@ -0,0 +1,18 @@ +from llama_index.core.workflow import InputRequiredEvent +from uipath.models import CreateAction, InvokeProcess, WaitAction, WaitJob + + +class InvokeProcessEvent(InvokeProcess, InputRequiredEvent): + pass + + +class WaitJobEvent(WaitJob, InputRequiredEvent): + pass + + +class CreateActionEvent(CreateAction, InputRequiredEvent): + pass + + +class WaitActionEvent(WaitAction, InputRequiredEvent): + pass diff --git a/uv.lock b/uv.lock index a612099..bbdaf96 100644 --- a/uv.lock +++ b/uv.lock @@ -2840,7 +2840,7 @@ wheels = [ [[package]] name = "uipath-llamaindex" -version = "0.0.11" +version = "0.0.20" source = { editable = "." } dependencies = [ { name = "llama-index" },