Skip to content

Commit 5c7baea

Browse files
chore: enhance usage with stream mode (#3039)
Co-authored-by: Wendong-Fan <133094783+Wendong-Fan@users.noreply.github.com> Co-authored-by: Wendong-Fan <w3ndong.fan@gmail.com>
1 parent 744be31 commit 5c7baea

File tree

2 files changed

+103
-16
lines changed

2 files changed

+103
-16
lines changed

camel/agents/chat_agent.py

Lines changed: 88 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2665,12 +2665,6 @@ def _process_stream_chunks_with_accumulator(
26652665
stream_completed = False
26662666

26672667
for chunk in stream:
2668-
# Update token usage if available
2669-
if chunk.usage:
2670-
self._update_token_usage_tracker(
2671-
step_token_usage, safe_model_dump(chunk.usage)
2672-
)
2673-
26742668
# Process chunk delta
26752669
if chunk.choices and len(chunk.choices) > 0:
26762670
choice = chunk.choices[0]
@@ -2741,7 +2735,49 @@ def _process_stream_chunks_with_accumulator(
27412735
)
27422736

27432737
self.record_message(final_message)
2744-
break
2738+
elif chunk.usage and not chunk.choices:
2739+
# Handle final chunk with usage but empty choices
2740+
# This happens when stream_options={"include_usage": True}
2741+
# Update the final usage from this chunk
2742+
self._update_token_usage_tracker(
2743+
step_token_usage, safe_model_dump(chunk.usage)
2744+
)
2745+
2746+
# Create final response with final usage
2747+
final_content = content_accumulator.get_full_content()
2748+
if final_content.strip():
2749+
final_message = BaseMessage(
2750+
role_name=self.role_name,
2751+
role_type=self.role_type,
2752+
meta_dict={},
2753+
content=final_content,
2754+
)
2755+
2756+
if response_format:
2757+
self._try_format_message(
2758+
final_message, response_format
2759+
)
2760+
2761+
# Create final response with final usage (not partial)
2762+
final_response = ChatAgentResponse(
2763+
msgs=[final_message],
2764+
terminated=False,
2765+
info={
2766+
"id": getattr(chunk, 'id', ''),
2767+
"usage": step_token_usage.copy(),
2768+
"finish_reasons": ["stop"],
2769+
"num_tokens": self._get_token_count(final_content),
2770+
"tool_calls": tool_call_records or [],
2771+
"external_tool_requests": None,
2772+
"streaming": False,
2773+
"partial": False,
2774+
},
2775+
)
2776+
yield final_response
2777+
break
2778+
elif stream_completed:
2779+
# If we've already seen finish_reason but no usage chunk, exit
2780+
break
27452781

27462782
return stream_completed, tool_calls_complete
27472783

@@ -3369,18 +3405,13 @@ async def _aprocess_stream_chunks_with_accumulator(
33693405
response_format: Optional[Type[BaseModel]] = None,
33703406
) -> AsyncGenerator[Union[ChatAgentResponse, Tuple[bool, bool]], None]:
33713407
r"""Async version of process streaming chunks with
3372-
content accumulator."""
3408+
content accumulator.
3409+
"""
33733410

33743411
tool_calls_complete = False
33753412
stream_completed = False
33763413

33773414
async for chunk in stream:
3378-
# Update token usage if available
3379-
if chunk.usage:
3380-
self._update_token_usage_tracker(
3381-
step_token_usage, safe_model_dump(chunk.usage)
3382-
)
3383-
33843415
# Process chunk delta
33853416
if chunk.choices and len(chunk.choices) > 0:
33863417
choice = chunk.choices[0]
@@ -3454,7 +3485,49 @@ async def _aprocess_stream_chunks_with_accumulator(
34543485
)
34553486

34563487
self.record_message(final_message)
3457-
break
3488+
elif chunk.usage and not chunk.choices:
3489+
# Handle final chunk with usage but empty choices
3490+
# This happens when stream_options={"include_usage": True}
3491+
# Update the final usage from this chunk
3492+
self._update_token_usage_tracker(
3493+
step_token_usage, safe_model_dump(chunk.usage)
3494+
)
3495+
3496+
# Create final response with final usage
3497+
final_content = content_accumulator.get_full_content()
3498+
if final_content.strip():
3499+
final_message = BaseMessage(
3500+
role_name=self.role_name,
3501+
role_type=self.role_type,
3502+
meta_dict={},
3503+
content=final_content,
3504+
)
3505+
3506+
if response_format:
3507+
self._try_format_message(
3508+
final_message, response_format
3509+
)
3510+
3511+
# Create final response with final usage (not partial)
3512+
final_response = ChatAgentResponse(
3513+
msgs=[final_message],
3514+
terminated=False,
3515+
info={
3516+
"id": getattr(chunk, 'id', ''),
3517+
"usage": step_token_usage.copy(),
3518+
"finish_reasons": ["stop"],
3519+
"num_tokens": self._get_token_count(final_content),
3520+
"tool_calls": tool_call_records or [],
3521+
"external_tool_requests": None,
3522+
"streaming": False,
3523+
"partial": False,
3524+
},
3525+
)
3526+
yield final_response
3527+
break
3528+
elif stream_completed:
3529+
# If we've already seen finish_reason but no usage chunk, exit
3530+
break
34583531

34593532
# Yield the final status as a tuple
34603533
yield (stream_completed, tool_calls_complete)

examples/agents/chatagent_stream.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
model_type=ModelType.GPT_4O_MINI,
4141
model_config_dict={
4242
"stream": True,
43+
# Ask OpenAI to include token usage in the final streamed chunk
44+
"stream_options": {"include_usage": True},
4345
},
4446
)
4547

@@ -103,6 +105,12 @@ def test_content_accumulation():
103105

104106
previous_content = current_content
105107

108+
usage = response.info.get("usage", {})
109+
print(
110+
f"\n\nUsage: prompt={usage.get('prompt_tokens')}, "
111+
f"completion={usage.get('completion_tokens')}, "
112+
f"total={usage.get('total_tokens')}"
113+
)
106114
print("\n✅ Content accumulation test passed!")
107115
print("\n" + "=" * 50)
108116
return True
@@ -147,6 +155,13 @@ async def test_async_tool_execution():
147155

148156
previous_content = current_content
149157

158+
final_response = await streaming_response
159+
usage = final_response.info.get("usage", {})
160+
print(
161+
f"\n\nUsage: prompt={usage.get('prompt_tokens')}, "
162+
f"completion={usage.get('completion_tokens')}, "
163+
f"total={usage.get('total_tokens')}"
164+
)
150165
print("\n" + "=" * 50)
151166
return True
152167

@@ -291,7 +306,6 @@ async def run_all_tests():
291306
return
292307

293308
# Test async structured output
294-
295309
if not await test_async_structured_output():
296310
print("❌ Async structured output test failed!")
297311
return

0 commit comments

Comments
 (0)