Skip to content

Tracer: support traceable attribute #232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/langchain/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-langchain"
version = "0.0.83"
version = "0.0.84"
description = "UiPath Langchain"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.9"
Expand Down
4 changes: 3 additions & 1 deletion sdk/langchain/uipath_langchain/_cli/_runtime/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
UiPathRuntimeResult,
)

from ..._utils import _instrument_traceable
from ...tracers import AsyncUiPathTracer
from ._context import LangGraphRuntimeContext
from ._exception import LangGraphRuntimeError
Expand Down Expand Up @@ -43,6 +44,7 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
Raises:
LangGraphRuntimeError: If execution fails
"""
_instrument_traceable()

await self.validate()

Expand Down Expand Up @@ -71,7 +73,7 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
callbacks: List[BaseCallbackHandler] = []

if self.context.job_id and self.context.tracing_enabled:
tracer = AsyncUiPathTracer()
tracer = AsyncUiPathTracer(context=self.context.trace_context)
await tracer.init_trace(
self.context.entrypoint, self.context.job_id
)
Expand Down
3 changes: 2 additions & 1 deletion sdk/langchain/uipath_langchain/_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ..tracers._instrument_traceable import _instrument_traceable
from ._request_mixin import UiPathRequestMixin

__all__ = ["UiPathRequestMixin"]
__all__ = ["UiPathRequestMixin", "_instrument_traceable"]
163 changes: 106 additions & 57 deletions sdk/langchain/uipath_langchain/tracers/AsyncUiPathTracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import uuid
import warnings
from os import environ as env
from typing import Any, Optional
from typing import Any, Dict, Optional

import httpx
from langchain_core.tracers.base import AsyncBaseTracer
from langchain_core.tracers.schemas import Run
from pydantic import PydanticDeprecationWarning
from uipath_sdk._cli._runtime._contracts import UiPathTraceContext

from ._events import CustomTraceEvents, FunctionCallEventData
from ._utils import _setup_tracer_httpx_logging, _simple_serialize_defaults

logger = logging.getLogger(__name__)
Expand All @@ -27,78 +29,98 @@ class Status:


class AsyncUiPathTracer(AsyncBaseTracer):
def __init__(self, client=None, **kwargs):
def __init__(
self,
context: Optional[UiPathTraceContext] = None,
client: Optional[httpx.AsyncClient] = None,
**kwargs,
):
super().__init__(**kwargs)

self.client = client or httpx.AsyncClient()
self.retries = 3
self.log_queue: queue.Queue[dict[str, Any]] = queue.Queue()

self.context = context or UiPathTraceContext()

llm_ops_pattern = self._get_base_url() + "{orgId}/llmops_"
self.orgId = env.get(
"UIPATH_ORGANIZATION_ID", "00000000-0000-0000-0000-000000000000"
)
self.tenantId = env.get(
"UIPATH_TENANT_ID", "00000000-0000-0000-0000-000000000000"
)
self.url = llm_ops_pattern.format(orgId=self.orgId).rstrip("/")

self.auth_token = env.get("UNATTENDED_USER_ACCESS_TOKEN") or env.get(
self.url = llm_ops_pattern.format(orgId=self.context.org_id).rstrip("/")

auth_token = env.get("UNATTENDED_USER_ACCESS_TOKEN") or env.get(
"UIPATH_ACCESS_TOKEN"
)

self.jobKey = env.get("UIPATH_JOB_KEY")
self.folderKey = env.get("UIPATH_FOLDER_KEY")
self.processKey = env.get("UIPATH_PROCESS_UUID")
self.parent_span_id = env.get("UIPATH_PARENT_SPAN_ID")

self.referenceId = self.jobKey or str(uuid.uuid4())

self.headers = {
"Authorization": f"Bearer {self.auth_token}",
}
self.headers = {"Authorization": f"Bearer {auth_token}"}

self.running = True
self.worker_task = asyncio.create_task(self._worker())
self.function_call_run_map: Dict[str, Run] = {}

async def on_custom_event(
self,
name: str,
data: Any,
*,
run_id: uuid.UUID,
tags=None,
metadata=None,
**kwargs: Any,
) -> None:
if name == CustomTraceEvents.UIPATH_TRACE_FUNCTION_CALL:
# only handle the function call event

if not isinstance(data, FunctionCallEventData):
logger.warning(
f"Received unexpected data type for function call event: {type(data)}"
)
return

def _get_base_url(self) -> str:
uipath_url = (
env.get("UIPATH_URL") or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
)
uipath_url = uipath_url.rstrip("/")
if data.event_type == "call":
run = self.run_map[str(run_id)]
child_run = run.create_child(
name=data.function_name, run_type=data.run_type, tags=data.tags
)

# split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
parts = uipath_url.split("//")
if data.metadata is not None:
run.add_metadata(data.metadata)

# after splitting by //, the base URL will be at index 1 along with the rest,
# hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
base_url_parts = parts[1].split("/")
call_uuid = data.call_uuid
self.function_call_run_map[call_uuid] = child_run

# combine scheme and netloc to get the base URL
base_url = parts[0] + "//" + base_url_parts[0] + "/"
self._send_span(run)

return base_url
if data.event_type == "completion":
call_uuid = data.call_uuid
previous_run = self.function_call_run_map.pop(call_uuid, None)

if previous_run:
previous_run.end(
outputs=self._safe_dict_dump(data.output), error=data.error
)
self._send_span(previous_run)

async def init_trace(self, run_name, trace_id=None) -> None:
trace_id_env = env.get("UIPATH_TRACE_ID")
if self.context.trace_id:
# trace id already set no need to do anything
return

if trace_id_env:
self.trace_parent = trace_id_env
else:
await self.start_trace(run_name, trace_id)
# no trace id, start a new trace
await self.start_trace(run_name, trace_id)

async def start_trace(self, run_name, trace_id=None) -> None:
self.trace_parent = trace_id or str(uuid.uuid4())
run_name = run_name or f"Job Run: {self.trace_parent}"
self.context.trace_id = str(uuid.uuid4())

run_name = run_name or f"Job Run: {self.context.trace_id}"
trace_data = {
"id": self.trace_parent,
"id": self.context.trace_id,
"name": re.sub(
"[!@#$<>\.]", "", run_name
), # if we use these characters the Agents UI throws some error (but llmops backend seems fine)
"referenceId": self.referenceId,
"referenceId": self.context.reference_id,
"attributes": "{}",
"organizationId": self.orgId,
"tenantId": self.tenantId,
"organizationId": self.context.org_id,
"tenantId": self.context.tenant_id,
}

for attempt in range(self.retries):
Expand Down Expand Up @@ -176,9 +198,9 @@ async def _worker(self):

async def _persist_run(self, run: Run) -> None:
# Determine if this is a start or end trace based on whether end_time is set
await self._send_span(run)
self._send_span(run)

async def _send_span(self, run: Run) -> None:
def _send_span(self, run: Run) -> None:
"""Send span data for a run to the API"""
run_id = str(run.id)

Expand All @@ -193,27 +215,27 @@ async def _send_span(self, run: Run) -> None:
parent_id = (
str(run.parent_run_id)
if run.parent_run_id is not None
else self.parent_span_id
else self.context.parent_span_id
)
attributes = self._safe_json_dump(self._run_to_dict(run))
attributes = self._safe_jsons_dump(self._run_to_dict(run))
status = self._determine_status(run.error)

span_data = {
"id": run_id,
"parentId": parent_id,
"traceId": self.trace_parent,
"traceId": self.context.trace_id,
"name": run.name,
"startTime": start_time,
"endTime": end_time,
"referenceId": self.referenceId,
"referenceId": self.context.reference_id,
"attributes": attributes,
"organizationId": self.orgId,
"tenantId": self.tenantId,
"organizationId": self.context.org_id,
"tenantId": self.context.tenant_id,
"spanType": "LangGraphRun",
"status": status,
"jobKey": self.jobKey,
"folderKey": self.folderKey,
"processKey": self.processKey,
"jobKey": self.context.job_id,
"folderKey": self.context.folder_key,
"processKey": self.context.folder_key,
}

self.log_queue.put(span_data)
Expand All @@ -237,14 +259,23 @@ def _determine_status(self, error: Optional[str]):

return Status.SUCCESS

def _safe_json_dump(self, obj) -> str:
def _safe_jsons_dump(self, obj) -> str:
try:
json_str = json.dumps(obj, default=_simple_serialize_defaults)
return json_str
except Exception as e:
logger.warning(e)
logger.warning(f"Error serializing object to JSON: {e}")
return "{ }"

def _safe_dict_dump(self, obj) -> Dict[str, Any]:
try:
serialized = json.loads(json.dumps(obj, default=_simple_serialize_defaults))
return serialized
except Exception as e:
# Last resort - string representation
logger.warning(f"Error serializing object to JSON: {e}")
return {"raw": str(obj)}

def _run_to_dict(self, run: Run):
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=PydanticDeprecationWarning)
Expand All @@ -254,3 +285,21 @@ def _run_to_dict(self, run: Run):
"inputs": run.inputs.copy() if run.inputs is not None else None,
"outputs": run.outputs.copy() if run.outputs is not None else None,
}

def _get_base_url(self) -> str:
uipath_url = (
env.get("UIPATH_URL") or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
)
uipath_url = uipath_url.rstrip("/")

# split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
parts = uipath_url.split("//")

# after splitting by //, the base URL will be at index 1 along with the rest,
# hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
base_url_parts = parts[1].split("/")

# combine scheme and netloc to get the base URL
base_url = parts[0] + "//" + base_url_parts[0] + "/"

return base_url
3 changes: 2 additions & 1 deletion sdk/langchain/uipath_langchain/tracers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ._instrument_traceable import _instrument_traceable
from .AsyncUiPathTracer import AsyncUiPathTracer
from .UiPathTracer import UiPathTracer

__all__ = ["AsyncUiPathTracer", "UiPathTracer"]
__all__ = ["AsyncUiPathTracer", "UiPathTracer", "_instrument_traceable"]
33 changes: 33 additions & 0 deletions sdk/langchain/uipath_langchain/tracers/_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Any, Dict, List, Literal, Optional

RUN_TYPE_T = Literal[
"tool", "chain", "llm", "retriever", "embedding", "prompt", "parser"
]


class CustomTraceEvents:
UIPATH_TRACE_FUNCTION_CALL = "__uipath_trace_function_call"


class FunctionCallEventData:
def __init__(
self,
function_name: str,
event_type: str,
inputs: Dict[str, Any],
call_uuid: str,
output: Any,
error: str,
run_type: Optional[RUN_TYPE_T] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
):
self.function_name = function_name
self.event_type = event_type
self.inputs = inputs
self.call_uuid = call_uuid
self.output = output
self.error = error
self.run_type = run_type or "chain"
self.tags = tags
self.metadata = metadata
Loading
Loading