Skip to content

feat: add to_dict and from_dict to StreamingChunk #9608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion haystack/dataclasses/streaming_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0

from dataclasses import dataclass, field
from dataclasses import asdict, dataclass, field
from typing import Any, Awaitable, Callable, Dict, List, Literal, Optional, Union, overload

from haystack.core.component import Component
Expand Down Expand Up @@ -102,6 +102,71 @@ def __post_init__(self):
if (self.tool_calls or self.tool_call_result) and self.index is None:
raise ValueError("If `tool_call`, or `tool_call_result` is set, `index` must also be set.")

def to_dict(self) -> Dict[str, Any]:
"""
Returns a dictionary representation of the StreamingChunk.

:returns: Serialized dictionary representation of the calling object.
"""
return {
"content": self.content,
"meta": self.meta,
"component_info": asdict(self.component_info) if self.component_info else None,
"index": self.index,
"tool_calls": [asdict(tc) for tc in self.tool_calls] if self.tool_calls else None,
"tool_call_result": asdict(self.tool_call_result) if self.tool_call_result else None,
"start": self.start,
"finish_reason": self.finish_reason,
}

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "StreamingChunk":
"""
Creates a deserialized StreamingChunk instance from a serialized representation.

:param data: Dictionary containing the StreamingChunk's attributes.
:returns: A StreamingChunk instance.
"""
if "content" not in data:
raise ValueError("Missing field 'content' in StreamingChunk deserialization. Field 'content' is required.")

component_info = data.get("component_info")
if isinstance(component_info, dict):
component_info = ComponentInfo(**component_info)
elif not (component_info is None or isinstance(component_info, ComponentInfo)):
raise TypeError("component_info must be of type dict or ComponentInfo.")

tool_calls = data.get("tool_calls")
if tool_calls is not None:
if not isinstance(tool_calls, list):
raise TypeError("tool_calls must be a list of ToolCallDelta.")
checked_tool_calls = []
for tool_call in tool_calls:
if isinstance(tool_call, dict):
checked_tool_calls.append(ToolCallDelta(**tool_call))
elif isinstance(tool_call, ToolCallDelta):
checked_tool_calls.append(tool_call)
else:
raise TypeError("Each element of tool_calls must be of type dict or ToolCallDelta.")
tool_calls = checked_tool_calls

tool_call_result = data.get("tool_call_result")
if isinstance(tool_call_result, dict):
tool_call_result = ToolCallResult(**tool_call_result)
elif not (tool_call_result is None or isinstance(tool_call_result, ToolCallResult)):
raise TypeError("tool_call_result must be of type dict or ToolCallResult.")

return StreamingChunk(
content=data["content"],
meta=data.get("meta", {}),
component_info=component_info,
index=data.get("index"),
tool_calls=tool_calls,
tool_call_result=tool_call_result,
start=data.get("start", False),
finish_reason=data.get("finish_reason"),
)


SyncStreamingCallbackT = Callable[[StreamingChunk], None]
AsyncStreamingCallbackT = Callable[[StreamingChunk], Awaitable[None]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
Add `to_dict` and `from_dict` to StreamingChunk to make it consistent with our other dataclasses in having serialization and deserialization methods.
53 changes: 53 additions & 0 deletions test/dataclasses/test_streaming_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,56 @@ def test_finish_reason_tool_call_results():
assert chunk.finish_reason == "tool_call_results"
assert chunk.meta["finish_reason"] == "tool_call_results"
assert chunk.content == ""


def test_to_dict():
"""Test the to_dict method for StreamingChunk."""
component = ExampleComponent()
component_info = ComponentInfo.from_component(component)
tool_call_result = ToolCallResult(
result="output", origin=ToolCall(id="123", tool_name="test_tool", arguments={"arg1": "value1"}), error=False
)

chunk = StreamingChunk(
content="",
meta={"key": "value"},
index=0,
component_info=component_info,
tool_call_result=tool_call_result,
finish_reason="tool_call_results",
)

d = chunk.to_dict()

assert d["content"] == ""
assert d["meta"] == {"key": "value"}
assert d["index"] == 0
assert d["component_info"]["type"] == "test_streaming_chunk.ExampleComponent"
assert d["tool_call_result"]["origin"]["id"] == "123"
assert d["tool_call_result"]["origin"]["arguments"]["arg1"] == "value1"
assert d["finish_reason"] == "tool_call_results"


def test_from_dict():
"""Test the from_dict method for StreamingChunk."""
component_info = ComponentInfo(type="test_streaming_chunk.ExampleComponent", name="test_component")
tool_calls = [{"id": "123", "tool_name": "test_tool", "arguments": '{"arg1": "value1"}', "index": 0}]

data = {
"content": "",
"meta": {"key": "value"},
"index": 0,
"component_info": component_info,
"tool_calls": tool_calls,
"finish_reason": "tool_calls",
}

chunk = StreamingChunk.from_dict(data)

assert chunk.content == ""
assert chunk.meta == {"key": "value"}
assert chunk.index == 0
assert chunk.component_info.type == "test_streaming_chunk.ExampleComponent"
assert chunk.component_info.name == "test_component"
assert chunk.tool_calls[0].tool_name == "test_tool"
assert chunk.finish_reason == "tool_calls"
Loading