Skip to content

Commit 637773a

Browse files
authored
feat(agents): add configurable accumulated vs delta streaming with memory optimization (#3145)
1 parent ed29baf commit 637773a

File tree

3 files changed

+93
-13
lines changed

3 files changed

+93
-13
lines changed

camel/agents/chat_agent.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class StreamContentAccumulator:
142142

143143
def __init__(self):
144144
self.base_content = "" # Content before tool calls
145-
self.current_content = "" # Current streaming content
145+
self.current_content = [] # Accumulated streaming fragments
146146
self.tool_status_messages = [] # Accumulated tool status messages
147147

148148
def set_base_content(self, content: str):
@@ -151,7 +151,7 @@ def set_base_content(self, content: str):
151151

152152
def add_streaming_content(self, new_content: str):
153153
r"""Add new streaming content."""
154-
self.current_content += new_content
154+
self.current_content.append(new_content)
155155

156156
def add_tool_status(self, status_message: str):
157157
r"""Add a tool status message."""
@@ -160,16 +160,18 @@ def add_tool_status(self, status_message: str):
160160
def get_full_content(self) -> str:
161161
r"""Get the complete accumulated content."""
162162
tool_messages = "".join(self.tool_status_messages)
163-
return self.base_content + tool_messages + self.current_content
163+
current = "".join(self.current_content)
164+
return self.base_content + tool_messages + current
164165

165166
def get_content_with_new_status(self, status_message: str) -> str:
166167
r"""Get content with a new status message appended."""
167168
tool_messages = "".join([*self.tool_status_messages, status_message])
168-
return self.base_content + tool_messages + self.current_content
169+
current = "".join(self.current_content)
170+
return self.base_content + tool_messages + current
169171

170172
def reset_streaming_content(self):
171173
r"""Reset only the streaming content, keep base and tool status."""
172-
self.current_content = ""
174+
self.current_content = []
173175

174176

175177
class StreamingChatAgentResponse:
@@ -397,6 +399,10 @@ class ChatAgent(BaseAgent):
397399
step_timeout (Optional[float], optional): Timeout in seconds for the
398400
entire step operation. If None, no timeout is applied.
399401
(default: :obj:`None`)
402+
stream_accumulate (bool, optional): When True, partial streaming
403+
updates return accumulated content (current behavior). When False,
404+
partial updates return only the incremental delta. (default:
405+
:obj:`True`)
400406
"""
401407

402408
def __init__(
@@ -440,6 +446,7 @@ def __init__(
440446
retry_attempts: int = 3,
441447
retry_delay: float = 1.0,
442448
step_timeout: Optional[float] = None,
449+
stream_accumulate: bool = True,
443450
) -> None:
444451
if isinstance(model, ModelManager):
445452
self.model_backend = model
@@ -528,6 +535,7 @@ def __init__(
528535
self.retry_attempts = max(1, retry_attempts)
529536
self.retry_delay = max(0.0, retry_delay)
530537
self.step_timeout = step_timeout
538+
self.stream_accumulate = stream_accumulate
531539

532540
def reset(self):
533541
r"""Resets the :obj:`ChatAgent` to its initial state."""
@@ -3668,15 +3676,18 @@ def _create_streaming_response_with_accumulator(
36683676
) -> ChatAgentResponse:
36693677
r"""Create a streaming response using content accumulator."""
36703678

3671-
# Add new content to accumulator and get full content
3679+
# Add new content; only build full content when needed
36723680
accumulator.add_streaming_content(new_content)
3673-
full_content = accumulator.get_full_content()
3681+
if self.stream_accumulate:
3682+
message_content = accumulator.get_full_content()
3683+
else:
3684+
message_content = new_content
36743685

36753686
message = BaseMessage(
36763687
role_name=self.role_name,
36773688
role_type=self.role_type,
36783689
meta_dict={},
3679-
content=full_content,
3690+
content=message_content,
36803691
)
36813692

36823693
return ChatAgentResponse(
@@ -3686,7 +3697,7 @@ def _create_streaming_response_with_accumulator(
36863697
"id": response_id,
36873698
"usage": step_token_usage.copy(),
36883699
"finish_reasons": ["streaming"],
3689-
"num_tokens": self._get_token_count(full_content),
3700+
"num_tokens": self._get_token_count(message_content),
36903701
"tool_calls": tool_call_records or [],
36913702
"external_tool_requests": None,
36923703
"streaming": True,
@@ -3773,6 +3784,7 @@ def clone(self, with_memory: bool = False) -> ChatAgent:
37733784
tool_execution_timeout=self.tool_execution_timeout,
37743785
pause_event=self.pause_event,
37753786
prune_tool_calls_from_memory=self.prune_tool_calls_from_memory,
3787+
stream_accumulate=self.stream_accumulate,
37763788
)
37773789

37783790
# Copy memory if requested

examples/agents/chatagent_stream.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
# Create a streaming model
1919
streaming_model = ModelFactory.create(
2020
model_platform=ModelPlatformType.DEFAULT,
21-
model_type=ModelType.DEFAULT,
21+
model_type=ModelType.GPT_4O_MINI,
2222
model_config_dict={
2323
"stream": True,
2424
"stream_options": {"include_usage": True},
2525
},
2626
)
2727

28-
agent = ChatAgent(
28+
agent_accumulated = ChatAgent(
2929
system_message="You are a helpful assistant that provides detailed "
3030
"and informative responses.",
3131
model=streaming_model,
@@ -36,10 +36,29 @@
3636
"it impacts the environment."
3737

3838
# Get streaming response
39-
streaming_response = agent.step(user_message)
39+
streaming_response = agent_accumulated.step(user_message)
4040

4141
# Stream the response chunks
4242
for chunk_response in streaming_response:
4343
# Each chunk_response is a ChatAgentResponse with incremental content
4444
chunk_content = chunk_response.msgs[0].content
4545
print(chunk_content, end="", flush=True)
46+
47+
print("\n\n---\nDelta streaming mode (stream_accumulate=False):\n")
48+
49+
# Create an agent that yields delta chunks instead of accumulated content
50+
agent_delta = ChatAgent(
51+
system_message="You are a helpful assistant that provides concise "
52+
"and informative responses.",
53+
model=streaming_model,
54+
stream_accumulate=False, # Only yield the delta part per chunk
55+
)
56+
57+
# Get streaming response (delta chunks)
58+
streaming_response_delta = agent_delta.step(user_message)
59+
60+
# Stream only the delta content per chunk; printing reconstructs the full text
61+
for chunk_response in streaming_response_delta:
62+
delta_content = chunk_response.msgs[0].content
63+
print(delta_content, end="", flush=True)
64+
print()

test/agents/test_chat_agent.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from pydantic import BaseModel, Field
3131

3232
from camel.agents import ChatAgent
33-
from camel.agents.chat_agent import ToolCallingRecord
33+
from camel.agents.chat_agent import StreamContentAccumulator, ToolCallingRecord
3434
from camel.configs import ChatGPTConfig
3535
from camel.generators import SystemMessageGenerator
3636
from camel.memories import MemoryRecord
@@ -743,6 +743,55 @@ def test_chat_agent_stream_output(step_call_count=3):
743743
), f"Error in calling round {i+1}"
744744

745745

746+
@pytest.mark.model_backend
747+
def test_chat_agent_stream_accumulate_mode_accumulated():
748+
"""Verify accumulated streaming behavior (stream_accumulate=True)."""
749+
chunks = ["Hello", " ", "world"]
750+
step_usage = {
751+
"completion_tokens": 0,
752+
"prompt_tokens": 0,
753+
"total_tokens": 0,
754+
}
755+
756+
agent = ChatAgent()
757+
accumulator = StreamContentAccumulator()
758+
outputs = []
759+
for c in chunks:
760+
resp = agent._create_streaming_response_with_accumulator(
761+
accumulator, c, step_usage, "acc", []
762+
)
763+
outputs.append(resp.msg.content)
764+
765+
assert len(outputs) == 3
766+
assert outputs[0] == "Hello"
767+
assert outputs[1] == "Hello "
768+
assert outputs[2] == "Hello world"
769+
assert accumulator.get_full_content() == "Hello world"
770+
771+
772+
@pytest.mark.model_backend
773+
def test_chat_agent_stream_accumulate_mode_delta():
774+
"""Verify delta streaming behavior (stream_accumulate=False)."""
775+
chunks = ["Hello", " ", "world"]
776+
step_usage = {
777+
"completion_tokens": 0,
778+
"prompt_tokens": 0,
779+
"total_tokens": 0,
780+
}
781+
782+
agent = ChatAgent(stream_accumulate=False)
783+
accumulator = StreamContentAccumulator()
784+
outputs = []
785+
for c in chunks:
786+
resp = agent._create_streaming_response_with_accumulator(
787+
accumulator, c, step_usage, "delta", []
788+
)
789+
outputs.append(resp.msg.content)
790+
791+
assert outputs == chunks
792+
assert accumulator.get_full_content() == "Hello world"
793+
794+
746795
@pytest.mark.model_backend
747796
def test_set_output_language():
748797
system_message = BaseMessage(

0 commit comments

Comments
 (0)