Skip to content

Python: Emit token usage with streaming chat completion agent. #12416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/samples/concepts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@

- [Chat Completion Agent as Kernel Function](./agents/chat_completion_agent/chat_completion_agent_as_kernel_function.py)
- [Chat Completion Agent Function Termination](./agents/chat_completion_agent/chat_completion_agent_function_termination.py)
- [Chat Completion Agent Templating](./agents/chat_completion_agent/chat_completion_agent_prompt_templating.py)
- [Chat Completion Agent Message Callback Streaming](./agents/chat_completion_agent/chat_completion_agent_message_callback_streaming.py)
- [Chat Completion Agent Message Callback](./agents/chat_completion_agent/chat_completion_agent_message_callback.py)
- [Chat Completion Agent Templating](./agents/chat_completion_agent/chat_completion_agent_prompt_templating.py)
- [Chat Completion Agent Streaming Token Usage](./agents/chat_completion_agent/chat_completion_agent_streaming_token_usage.py)
- [Chat Completion Agent Summary History Reducer Agent Chat](./agents/chat_completion_agent/chat_completion_agent_summary_history_reducer_agent_chat.py)
- [Chat Completion Agent Summary History Reducer Single Agent](./agents/chat_completion_agent/chat_completion_agent_summary_history_reducer_single_agent.py)
- [Chat Completion Agent Token Usage](./agents/chat_completion_agent/chat_completion_agent_token_usage.py)
- [Chat Completion Agent Truncate History Reducer Agent Chat](./agents/chat_completion_agent/chat_completion_agent_truncate_history_reducer_agent_chat.py)
- [Chat Completion Agent Truncate History Reducer Single Agent](./agents/chat_completion_agent/chat_completion_agent_truncate_history_reducer_single_agent.py)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
from typing import Annotated

from semantic_kernel.agents import ChatCompletionAgent, ChatHistoryAgentThread
from semantic_kernel.connectors.ai.completion_usage import CompletionUsage
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.functions import kernel_function

"""
The following sample demonstrates how to create a chat completion agent
and use it with streaming responses. It also shows how to track token
usage during the streaming process.
"""


# Define a sample plugin for the sample
class MenuPlugin:
"""A sample Menu Plugin used for the concept sample."""

@kernel_function(description="Provides a list of specials from the menu.")
def get_specials(self) -> Annotated[str, "Returns the specials from the menu."]:
return """
Special Soup: Clam Chowder
Special Salad: Cobb Salad
Special Drink: Chai Tea
"""

@kernel_function(description="Provides the price of the requested menu item.")
def get_item_price(
self, menu_item: Annotated[str, "The name of the menu item."]
) -> Annotated[str, "Returns the price of the menu item."]:
return "$9.99"


async def main() -> None:
agent = ChatCompletionAgent(
service=AzureChatCompletion(),
name="Assistant",
instructions="Answer questions about the menu.",
plugins=[MenuPlugin()],
)

# Create a thread for the agent
# If no thread is provided, a new thread will be
# created and returned with the initial response
thread: ChatHistoryAgentThread = None

user_inputs = [
"Hello",
"What is the special soup?",
"How much does that cost?",
"Thank you",
]

completion_usage = CompletionUsage()

for user_input in user_inputs:
print(f"\n# User: '{user_input}'")
async for response in agent.invoke_stream(
messages=user_input,
thread=thread,
):
if response.content:
print(response.content, end="", flush=True)
if response.metadata.get("usage"):
completion_usage += response.metadata["usage"]
print(f"\nStreaming Usage: {response.metadata['usage']}")
thread = response.thread
print()

# Print the completion usage
print(f"\nStreaming Total Completion Usage: {completion_usage.model_dump_json(indent=4)}")

"""
Sample Output:

# User: 'Hello'
Hello! How can I help you with the menu today?

# User: 'What is the special soup?'
The special soup today is Clam Chowder. Would you like more details or are you interested in something else from
the menu?

# User: 'How much does that cost?'
The Clam Chowder special soup costs $9.99. Would you like to add it to your order or ask about something else?

# User: 'Thank you'
You're welcome! If you have any more questions or need help with the menu, just let me know. Enjoy your meal!

Streaming Total Completion Usage: {
"prompt_tokens": 1150,
"prompt_tokens_details": {
"audio_tokens": 0,
"cached_tokens": 0
},
"completion_tokens": 134,
"completion_tokens_details": {
"accepted_prediction_tokens": 0,
"audio_tokens": 0,
"reasoning_tokens": 0,
"rejected_prediction_tokens": 0
}
}
"""


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
from typing import Annotated

from semantic_kernel.agents import ChatCompletionAgent, ChatHistoryAgentThread
from semantic_kernel.connectors.ai.completion_usage import CompletionUsage
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.functions import kernel_function

"""
The following sample demonstrates how to create a chat completion agent
and use it with non-streaming responses. It also shows how to track token
usage during agent invoke.
"""


# Define a sample plugin for the sample
class MenuPlugin:
"""A sample Menu Plugin used for the concept sample."""

@kernel_function(description="Provides a list of specials from the menu.")
def get_specials(self) -> Annotated[str, "Returns the specials from the menu."]:
return """
Special Soup: Clam Chowder
Special Salad: Cobb Salad
Special Drink: Chai Tea
"""

@kernel_function(description="Provides the price of the requested menu item.")
def get_item_price(
self, menu_item: Annotated[str, "The name of the menu item."]
) -> Annotated[str, "Returns the price of the menu item."]:
return "$9.99"


async def main() -> None:
agent = ChatCompletionAgent(
service=AzureChatCompletion(),
name="Assistant",
instructions="Answer questions about the menu.",
plugins=[MenuPlugin()],
)

# Create a thread for the agent
# If no thread is provided, a new thread will be
# created and returned with the initial response
thread: ChatHistoryAgentThread = None

user_inputs = [
"Hello",
"What is the special soup?",
"How much does that cost?",
"Thank you",
]

completion_usage = CompletionUsage()

for user_input in user_inputs:
print(f"\n# User: '{user_input}'")
async for response in agent.invoke(
messages=user_input,
thread=thread,
):
if response.content:
print(response.content)
if response.metadata.get("usage"):
completion_usage += response.metadata["usage"]
thread = response.thread
print()

# Print the completion usage
print(f"\nNon-Streaming Total Completion Usage: {completion_usage.model_dump_json(indent=4)}")

"""
Sample Output:

# User: 'Hello'
Hello! How can I help you with the menu today?


# User: 'What is the special soup?'
The special soup today is Clam Chowder. Would you like to know more about it or see the other specials?


# User: 'How much does that cost?'
The Clam Chowder special costs $9.99. Would you like to add that to your order or need more information?


# User: 'Thank you'
You're welcome! If you have any more questions or need help with the menu, just let me know. Enjoy your day!

Non-Streaming Total Completion Usage: {
"prompt_tokens": 772,
"prompt_tokens_details": {
"audio_tokens": 0,
"cached_tokens": 0
},
"completion_tokens": 92,
"completion_tokens_details": {
"accepted_prediction_tokens": 0,
"audio_tokens": 0,
"reasoning_tokens": 0,
"rejected_prediction_tokens": 0
}
}
"""


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ async def invoke_stream(

if (
role == AuthorRole.ASSISTANT
and response.items
and (response.items or response.metadata.get("usage"))
and not any(
isinstance(item, (FunctionCallContent, FunctionResultContent)) for item in response.items
)
Expand Down
35 changes: 32 additions & 3 deletions python/semantic_kernel/connectors/ai/completion_usage.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,56 @@
# Copyright (c) Microsoft. All rights reserved.


from openai.types import CompletionUsage as OpenAICompletionUsage
from openai.types.completion_usage import CompletionTokensDetails, PromptTokensDetails

from semantic_kernel.kernel_pydantic import KernelBaseModel


class CompletionUsage(KernelBaseModel):
"""Completion usage information."""
"""A class representing the usage of tokens in a completion request."""

prompt_tokens: int | None = None
prompt_tokens_details: PromptTokensDetails | None = None
completion_tokens: int | None = None
completion_tokens_details: CompletionTokensDetails | None = None

@classmethod
def from_openai(cls, openai_completion_usage: OpenAICompletionUsage):
"""Create a CompletionUsage object from an OpenAI response."""
"""Create a CompletionUsage instance from an OpenAICompletionUsage instance."""
return cls(
prompt_tokens=openai_completion_usage.prompt_tokens,
prompt_tokens_details=openai_completion_usage.prompt_tokens_details
if openai_completion_usage.prompt_tokens_details
else None,
completion_tokens=openai_completion_usage.completion_tokens,
completion_tokens_details=openai_completion_usage.completion_tokens_details
if openai_completion_usage.completion_tokens_details
else None,
)

def __add__(self, other: "CompletionUsage") -> "CompletionUsage":
"""Add two CompletionUsage objects."""
"""Combine two CompletionUsage instances by summing their token counts."""

def _merge_details(cls, a, b):
"""Merge two details objects by summing their fields."""
if a is None and b is None:
return None
kwargs = {}
for field in cls.__annotations__:
x = getattr(a, field, None)
y = getattr(b, field, None)
value = None if x is None and y is None else (x or 0) + (y or 0)
kwargs[field] = value
return cls(**kwargs)

return CompletionUsage(
prompt_tokens=(self.prompt_tokens or 0) + (other.prompt_tokens or 0),
completion_tokens=(self.completion_tokens or 0) + (other.completion_tokens or 0),
prompt_tokens_details=_merge_details(
PromptTokensDetails, self.prompt_tokens_details, other.prompt_tokens_details
),
completion_tokens_details=_merge_details(
CompletionTokensDetails, self.completion_tokens_details, other.completion_tokens_details
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest

from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.connectors.ai.completion_usage import CompletionUsage
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, OpenAIChatCompletion
from semantic_kernel.contents import AuthorRole, ChatMessageContent, StreamingChatMessageContent
from semantic_kernel.contents.image_content import ImageContent
Expand Down Expand Up @@ -86,10 +87,15 @@ async def test_invoke(self, chat_completion_agent: ChatCompletionAgent, agent_te
"""Test invoke of the agent."""
responses = await agent_test_base.get_invoke_with_retry(chat_completion_agent, messages="Hello")
assert len(responses) > 0
usage: CompletionUsage = CompletionUsage()
for response in responses:
assert isinstance(response.message, ChatMessageContent)
assert response.message.role == AuthorRole.ASSISTANT
assert response.message.content is not None
if response.metadata.get("usage"):
usage += response.metadata["usage"]
assert usage.prompt_tokens > 0
assert usage.completion_tokens > 0

@pytest.mark.parametrize("chat_completion_agent", ["azure", "openai"], indirect=True, ids=["azure", "openai"])
async def test_invoke_with_thread(self, chat_completion_agent: ChatCompletionAgent, agent_test_base: AgentTestBase):
Expand All @@ -115,10 +121,15 @@ async def test_invoke_stream(self, chat_completion_agent: ChatCompletionAgent, a
"""Test invoke stream of the agent."""
responses = await agent_test_base.get_invoke_stream_with_retry(chat_completion_agent, messages="Hello")
assert len(responses) > 0
usage: CompletionUsage = CompletionUsage()
for response in responses:
assert isinstance(response.message, StreamingChatMessageContent)
assert response.message.role == AuthorRole.ASSISTANT
assert response.message.content is not None
if response.metadata.get("usage"):
usage += response.metadata["usage"]
assert usage.prompt_tokens > 0
assert usage.completion_tokens > 0

@pytest.mark.parametrize("chat_completion_agent", ["azure", "openai"], indirect=True, ids=["azure", "openai"])
async def test_invoke_stream_with_thread(
Expand Down
Loading