Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
5a69b0a
wip: add strands integration core files
statefb Jul 31, 2025
184f834
update deps
statefb Jul 31, 2025
6666daa
wip
statefb Jul 31, 2025
08ed418
update firecrawl-py version
statefb Aug 1, 2025
804ed54
insert debug logs
statefb Aug 1, 2025
280ff15
fix: on_stop impl
statefb Aug 1, 2025
6c6719f
wip
statefb Aug 1, 2025
a0ae60c
Fix reasoning content extraction from Strands AgentResult
statefb Aug 1, 2025
88f9085
fix: tool use id conversion
statefb Aug 2, 2025
f868e5d
fix: internet
statefb Aug 4, 2025
bb004a2
add debug log on websocket.py
statefb Aug 5, 2025
f4b54b7
add pytest
statefb Aug 5, 2025
045817b
add debug log on usePostMessageStreaming
statefb Aug 5, 2025
81eceb7
fix: tool input / output not displayed
statefb Aug 5, 2025
bcb27d5
fix: reasoning not persist
statefb Aug 5, 2025
af402d6
add calc tool for testing
statefb Aug 5, 2025
fc213a2
fix: multi turn conversation
statefb Aug 5, 2025
4012b5a
fix: tool registry
statefb Aug 6, 2025
9093e40
fix: wait complete tool input
statefb Aug 6, 2025
3399dca
fix: citation
statefb Aug 6, 2025
b54449a
fix: tool registry
statefb Aug 6, 2025
be189d4
fix: tool input consistency
statefb Aug 6, 2025
810a700
fix: support list
statefb Aug 7, 2025
19f7ac5
fix: list citation
statefb Aug 7, 2025
46f9385
fix: citation
statefb Aug 7, 2025
19c4640
remove context
statefb Aug 7, 2025
733d7a9
fix: knowledge tool strands to return list
statefb Aug 7, 2025
07d1798
refactor
statefb Aug 7, 2025
73c77ba
update strands version
statefb Aug 7, 2025
72def29
wip: chat strands refactor
statefb Aug 7, 2025
8e92411
refactor: call back handler
statefb Aug 7, 2025
13503b2
add post processing
statefb Aug 7, 2025
56f81c5
fix: tools / utils
statefb Aug 8, 2025
75b4ff4
fix: attatchment docs
statefb Aug 8, 2025
28ca754
fix: image content
statefb Aug 8, 2025
7187fbe
fix: continue generation
statefb Aug 8, 2025
d1ef26f
fix: Skip instruction messages as they are handled separately via mes…
statefb Aug 8, 2025
64a2f36
change log level for websocket.py
statefb Aug 8, 2025
8f2f322
lint and add comment
statefb Aug 8, 2025
47a594b
remove deprecated refactorings
statefb Aug 12, 2025
d35a229
fix: unittest
statefb Aug 12, 2025
86bee54
fix tools to return result as strands formats
statefb Aug 12, 2025
b3471a6
rename modules
statefb Aug 12, 2025
078b767
fix: skip reasoning / tool content to construct strunds message befor…
statefb Aug 13, 2025
4d207b6
fix: knowledge_search
statefb Aug 14, 2025
6700468
fix: source id citation
statefb Aug 15, 2025
39a9bc6
add: prompt cache (system, tool)
statefb Aug 28, 2025
2ce970d
add message cache
statefb Aug 28, 2025
54508b8
insert debug log
statefb Aug 28, 2025
3f9d42c
return empty list when no tool
statefb Aug 28, 2025
72181ee
fix: tool util
statefb Aug 29, 2025
770ec40
fix bedrock agent tool
statefb Aug 29, 2025
ae62ab2
add deprecated decorator
statefb Aug 29, 2025
fffef05
update documents including examples
statefb Aug 29, 2025
cd59fd6
add deprication decorator
statefb Aug 29, 2025
0f1d7bc
switch fetch_available_agent_tools for strands
statefb Aug 29, 2025
cf031d8
refactor modules for readability
statefb Aug 29, 2025
603a486
chore: mypy
statefb Aug 29, 2025
a768b83
refactor: simplify on_stop lambda in process_chat_input
statefb Aug 29, 2025
61fd258
remove unused imports on routes/bot.py
statefb Sep 1, 2025
19f20ec
fix: support legacy for `fetch_available_agent_tools`
statefb Sep 1, 2025
496e446
chore: lint
statefb Sep 1, 2025
1e13c42
remove unused tests
statefb Sep 1, 2025
54b5398
feat: implement telemetry management and data extraction for Strands …
statefb Sep 2, 2025
7c0868f
convert relative import to absolute
statefb Sep 3, 2025
dac0589
refactor: reorganize imports and remove console exporter setup
statefb Sep 3, 2025
2bc6450
Merge branch 'v3' into refactor-strands
statefb Sep 3, 2025
b29246b
add type notation
statefb Sep 3, 2025
1c25b62
fix: change back `bedrock_agent_invoke` to original: `bedrock_agent`
statefb Sep 3, 2025
2a81333
fix: change back `knowledge_search` to original: `knowledge_base_tool`
statefb Sep 3, 2025
83ce685
chore: lint
statefb Sep 3, 2025
1b20fd8
fix: tool example
statefb Sep 4, 2025
8c19b5f
add RAG support for model which does not support tool
statefb Sep 4, 2025
c8614f7
Merge branch 'v3' into refactor-strands
Yukinobu-Mine Sep 18, 2025
1263fa2
Merge branch 'v3' into refactor-strands
Yukinobu-Mine Sep 19, 2025
48a1ea5
Refactor implementation of strands agents migration.
Yukinobu-Mine Sep 16, 2025
a8946b3
refactor: add _ prefix to local helper functions in strands converters
statefb Oct 14, 2025
9728684
Merge branch 'v3' into refactor-strands
statefb Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions backend/app/strands_integration/chat_strands.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from .handlers import ToolResultCapture, create_callback_handler
from .processors import post_process_strands_result
from .telemetry import StrandsTelemetryManager

logger = logging.getLogger(__name__)

Expand All @@ -43,6 +44,36 @@ def chat_with_strands(
on_tool_result: Callable[[ToolRunResult], None] | None = None,
on_reasoning: Callable[[str], None] | None = None,
) -> tuple[ConversationModel, MessageModel]:
"""
Chat with Strands agents.

Architecture Overview:

1. Reasoning Content:
- Streaming: CallbackHandler processes reasoning events for real-time display
- Persistence: Telemetry (ReasoningSpanProcessor) extracts from OpenTelemetry spans

2. Tool Use/Result (Thinking Log):
- Streaming: ToolResultCapture processes tool events for real-time display
- Persistence: ToolResultCapture stores processed data for DynamoDB storage

3. Related Documents (Citations):
- Source: ToolResultCapture only
- Reason: Requires access to raw tool results for source_link extraction

Why This Hybrid Approach:

- ToolResultCapture: Processes raw tool results during execution hooks, enabling
source_link extraction and citation functionality. Telemetry only captures
post-processed data, losing metadata required for citations.

- Telemetry: Captures complete reasoning content from OpenTelemetry spans,
providing reliable persistence for reasoning data that may not be available
in final AgentResult when tools are used.

- CallbackHandler: Handles real-time streaming of reasoning content during
agent execution for immediate user feedback.
"""
user_msg_id, conversation, bot = prepare_conversation(user, chat_input)

display_citation = bot is not None and bot.display_retrieved_chunks
Expand Down Expand Up @@ -88,6 +119,10 @@ def chat_with_strands(

continue_generate = chat_input.continue_generate

# Setup telemetry manager for reasoning capture
telemetry_manager = StrandsTelemetryManager()
telemetry_manager.setup(conversation.id, user.id)

# Create ToolResultCapture to capture tool execution data
tool_capture = ToolResultCapture(
on_thinking=on_thinking,
Expand All @@ -104,8 +139,6 @@ def chat_with_strands(

agent.callback_handler = create_callback_handler(
on_stream=on_stream,
on_thinking=on_thinking,
on_tool_result=on_tool_result,
on_reasoning=on_reasoning,
)

Expand Down Expand Up @@ -186,6 +219,7 @@ def chat_with_strands(
user=user,
model_name=chat_input.message.model,
continue_generate=continue_generate,
telemetry_manager=telemetry_manager,
tool_capture=tool_capture,
on_stop=on_stop,
)
10 changes: 0 additions & 10 deletions backend/app/strands_integration/handlers/callback_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ def __call__(self, **kwargs):
reasoning_text = kwargs.get("reasoningText", "")
self.on_reasoning(reasoning_text)
self.collected_reasoning.append(reasoning_text)
elif "thinking" in kwargs and self.on_reasoning:
thinking_text = kwargs.get("thinking", "")
self.on_reasoning(thinking_text)
self.collected_reasoning.append(thinking_text)
# elif "event" in kwargs:
# event = kwargs["event"]
# print(f"[STRANDS_CALLBACK] Event: {event}")
# elif "message" in kwargs:
# message = kwargs["message"]
# print(f"[STRANDS_CALLBACK] Message: {message}")


def create_callback_handler(
Expand Down
23 changes: 17 additions & 6 deletions backend/app/strands_integration/processors/result_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from ..converters.message_converter import convert_strands_message_to_message_model
from ..handlers.tool_result_capture import ToolResultCapture
from ..telemetry.telemetry_manager import StrandsTelemetryManager
from .cost_calculator import calculate_conversation_cost
from .document_extractor import (
build_thinking_log_from_tool_capture,
Expand Down Expand Up @@ -59,6 +60,7 @@ def post_process_strands_result(
user: User,
model_name: type_model_name,
continue_generate: bool,
telemetry_manager: StrandsTelemetryManager,
tool_capture: ToolResultCapture,
on_stop: Callable[[OnStopInput], None] | None = None,
) -> tuple[ConversationModel, MessageModel]:
Expand All @@ -78,12 +80,21 @@ def post_process_strands_result(
conversation.total_price += price
conversation.should_continue = result.stop_reason == "max_tokens"

# 3. Build thinking_log from tool capture
# Extract reasoning content from telemetry
from ..telemetry import TelemetryDataExtractor

data_extractor = TelemetryDataExtractor(telemetry_manager.reasoning_processor)

reasoning_contents = data_extractor.extract_reasoning_content()
if reasoning_contents:
message.content.extend(reasoning_contents)

# Build thinking_log from tool capture
thinking_log = build_thinking_log_from_tool_capture(tool_capture)
if thinking_log:
message.thinking_log = thinking_log

# 4. Set message parent and generate assistant message ID
# 5. Set message parent and generate assistant message ID
message.parent = user_msg_id

if continue_generate:
Expand All @@ -108,12 +119,12 @@ def post_process_strands_result(
conversation.message_map[user_msg_id].children.append(assistant_msg_id)
conversation.last_message_id = assistant_msg_id

# 5. Extract related documents from tool capture
# Extract related documents from tool capture
related_documents = extract_related_documents_from_tool_capture(
tool_capture, assistant_msg_id
)

# 6. Store conversation and related documents
# 7. Store conversation and related documents
store_conversation(user.id, conversation)
if related_documents:
store_related_documents(
Expand All @@ -122,12 +133,12 @@ def post_process_strands_result(
related_documents=related_documents,
)

# 7. Call on_stop callback
# 8. Call on_stop callback
if on_stop:
on_stop_input = create_on_stop_input(result, message, price)
on_stop(on_stop_input)

# 8. Update bot statistics
# 9. Update bot statistics
if bot:
logger.debug("Bot is provided. Updating bot last used time.")
modify_bot_last_used_time(user, bot)
Expand Down
9 changes: 9 additions & 0 deletions backend/app/strands_integration/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .telemetry_manager import StrandsTelemetryManager
from .processors import ReasoningSpanProcessor
from .data_extractor import TelemetryDataExtractor

__all__ = [
"StrandsTelemetryManager",
"ReasoningSpanProcessor",
"TelemetryDataExtractor",
]
22 changes: 22 additions & 0 deletions backend/app/strands_integration/telemetry/data_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Data extraction utilities for Strands telemetry.
"""

import logging

from app.repositories.models.conversation import ReasoningContentModel

from .processors import ReasoningSpanProcessor

logger = logging.getLogger(__name__)


class TelemetryDataExtractor:
"""Extracts structured data from telemetry span processors."""

def __init__(self, reasoning_processor: ReasoningSpanProcessor):
self.reasoning_processor = reasoning_processor

def extract_reasoning_content(self) -> list[ReasoningContentModel]:
"""Extract reasoning content from telemetry data."""
return self.reasoning_processor.get_reasoning_data()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .reasoning_processor import ReasoningSpanProcessor

__all__ = [
"ReasoningSpanProcessor",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
Reasoning span processor for Strands telemetry.
"""

import json
import logging
from typing import Any, Optional

from app.repositories.models.conversation import ReasoningContentModel
from opentelemetry.context import Context
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor

logger = logging.getLogger(__name__)


class ReasoningSpanProcessor(SpanProcessor):
"""Processes spans to extract reasoning content for DynamoDB storage."""

def __init__(self) -> None:
self.reasoning_data: list[ReasoningContentModel] = []
self.conversation_id: str = ""
self.user_id: str = ""

def set_context(self, conversation_id: str, user_id: str) -> None:
"""Set conversation context for this processor."""
self.conversation_id = conversation_id
self.user_id = user_id

def on_start(
self, span: ReadableSpan, parent_context: Optional[Context] = None
) -> None:
"""Called when a span starts."""
pass

def on_end(self, span: ReadableSpan) -> None:
"""Called when a span ends - extract reasoning content."""
if span.name == "execute_event_loop_cycle":
logger.debug(f"Processing Cycle span: {span.name}")
reasoning = self._extract_reasoning_from_span(span)
if reasoning:
self.reasoning_data.append(reasoning)
logger.debug(f"Extracted reasoning content from span: {span.name}")
else:
logger.debug(f"No reasoning content found in span: {span.name}")

def shutdown(self) -> None:
"""Called when the processor is shutdown."""
pass

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Force flush any pending data."""
return True

def get_reasoning_data(self) -> list[ReasoningContentModel]:
"""Get extracted reasoning data."""
return self.reasoning_data.copy()

def _extract_reasoning_from_span(
self, span: ReadableSpan
) -> Optional[ReasoningContentModel]:
"""
Extract reasoning content from span events.

Expected Data Structure:

span.events contains gen_ai.choice events with the following structure:

event.attributes["message"] = JSON string containing:
[
{
"reasoningContent": {
"reasoningText": {
"text": "The user has provided what appears to be...",
"signature": "ErcBCkgIBxABGAIiQLG2dqOt..."
}
}
},
{
"text": "I'll calculate the result for you."
},
{
"toolUse": {
"toolUseId": "tooluse_xxx",
"name": "calculator",
"input": {"expression": "5432/64526234"}
}
}
]
"""
if not span.events:
logger.debug("No events found in span")
return None

for event in span.events:
if event.name == "gen_ai.choice":
if event.attributes is None:
continue

logger.debug(f"Found gen_ai.choice event: {event.attributes.keys()}")
try:
message_attr = event.attributes.get("message")
if not isinstance(message_attr, str):
continue

message_content = json.loads(message_attr)
logger.debug(
f"Parsed message content: {len(message_content)} items"
)

for content_block in message_content:
if "reasoningContent" in content_block:
reasoning_data = content_block["reasoningContent"]
logger.debug(
f"Found reasoningContent: {reasoning_data.keys()}"
)

if "reasoningText" in reasoning_data:
reasoning_text_data = reasoning_data["reasoningText"]
text = reasoning_text_data.get("text", "")
signature = reasoning_text_data.get("signature", "")

if text:
logger.debug(
f"Extracted reasoning text: {len(text)} chars"
)
return ReasoningContentModel(
content_type="reasoning",
text=text,
signature=signature,
redacted_content=b"",
)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Failed to parse reasoning content from event: {e}")

logger.debug("No reasoning content found in any events")
return None
32 changes: 32 additions & 0 deletions backend/app/strands_integration/telemetry/telemetry_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Telemetry manager for Strands integration.
"""

import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from strands.telemetry import StrandsTelemetry
from .processors import ReasoningSpanProcessor

logger = logging.getLogger(__name__)


class StrandsTelemetryManager:
"""Manages Strands telemetry setup and span processors."""

def __init__(self):
self.telemetry = StrandsTelemetry()
self.reasoning_processor = ReasoningSpanProcessor()

def setup(self, conversation_id: str, user_id: str):
"""Setup telemetry with custom span processors."""
# Setup console exporter for development
self.telemetry.setup_console_exporter()

# Get the tracer provider and add our custom processors
tracer_provider = trace.get_tracer_provider()
if isinstance(tracer_provider, TracerProvider):
tracer_provider.add_span_processor(self.reasoning_processor)
logger.debug("Added custom span processors to tracer provider")

self.reasoning_processor.set_context(conversation_id, user_id)
5 changes: 5 additions & 0 deletions backend/tests/test_usecases/test_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,11 @@ class TestAgentChat(unittest.TestCase):
model: type_model_name = "claude-v3.7-sonnet"

def setUp(self) -> None:
# Enable debug logging for telemetry processors
import logging

logging.getLogger("app.strands_integration.telemetry").setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)
private_bot = create_test_private_bot(
self.bot_id,
True,
Expand Down