Skip to content

Add token streaming support for XMLAdapter #8478

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 34 additions & 20 deletions dspy/streaming/streaming_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

from dspy.adapters.chat_adapter import ChatAdapter
from dspy.adapters.json_adapter import JSONAdapter
from dspy.adapters.xml_adapter import XMLAdapter
from dspy.dsp.utils.settings import settings
from dspy.streaming.messages import StreamResponse

if TYPE_CHECKING:
from dspy.primitives.module import Module

ADAPTER_SUPPORT_STREAMING = [ChatAdapter, XMLAdapter, JSONAdapter]


class StreamListener:
"""Class that listens to the stream to capture the streeaming of a specific output field of a predictor."""
Expand Down Expand Up @@ -45,11 +48,23 @@ def __init__(
self.cache_hit = False
self.allow_reuse = allow_reuse

self.json_adapter_start_identifier = f'"{self.signature_field_name}":'
self.json_adapter_end_identifier = re.compile(r"\w*\"(,|\s*})")

self.chat_adapter_start_identifier = f"[[ ## {self.signature_field_name} ## ]]"
self.chat_adapter_end_identifier = re.compile(r"\[\[ ## (\w+) ## \]\]")
self.adapter_identifiers = {
"ChatAdapter": {
"start_identifier": f"[[ ## {self.signature_field_name} ## ]]",
"end_identifier": re.compile(r"\[\[ ## (\w+) ## \]\]"),
"start_indicator": "[",
},
"JSONAdapter": {
"start_identifier": f'"{self.signature_field_name}":',
"end_identifier": re.compile(r"\w*\"(,|\s*})"),
"start_indicator": '"',
},
"XMLAdapter": {
"start_identifier": f"<{self.signature_field_name}>",
"end_identifier": re.compile(rf"</{self.signature_field_name}>"),
"start_indicator": "<",
},
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have three adapters, shall we define a mapping for adapter-specific parameters? Alternatively, we can move the identifier into each adapter class for better encapsulation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we define a mapping for adapter-specific parameters?
Yes, good call!

we can move the identifier into each adapter class for better encapsulation
This is a little subtle, the start_identifier/end_identifier is not a feature of the adapter, but a feature of the stream listener. The boundary is a bit vague though - adapter can fully function without start_identifier/end_identifier, and different listeners could introduce different start_identifier/end_identifier for the same adapter, so it belongs more to the stream listeners.

def _buffered_message_end_with_start_identifier(self, concat_message: str, start_identifier: str) -> str:
for i in range(len(concat_message)):
Expand All @@ -58,21 +73,15 @@ def _buffered_message_end_with_start_identifier(self, concat_message: str, start
return False

def receive(self, chunk: ModelResponseStream):
if isinstance(settings.adapter, JSONAdapter):
start_identifier = self.json_adapter_start_identifier
end_identifier = self.json_adapter_end_identifier

start_indicator = '"'
elif isinstance(settings.adapter, ChatAdapter) or settings.adapter is None:
start_identifier = self.chat_adapter_start_identifier
end_identifier = self.chat_adapter_end_identifier

start_indicator = "["
else:
adapter_name = settings.adapter.__class__.__name__ if settings.adapter else "ChatAdapter"
if adapter_name not in self.adapter_identifiers:
raise ValueError(
f"Unsupported adapter for streaming: {settings.adapter}, please use either ChatAdapter or "
"JSONAdapter for streaming purposes."
f"Unsupported adapter for streaming: {adapter_name}, please use one of the following adapters: "
f"{', '.join([a.__name__ for a in ADAPTER_SUPPORT_STREAMING])}"
)
start_identifier = self.adapter_identifiers[adapter_name]["start_identifier"]
end_identifier = self.adapter_identifiers[adapter_name]["end_identifier"]
start_indicator = self.adapter_identifiers[adapter_name]["start_indicator"]

if self.stream_end:
if self.allow_reuse:
Expand Down Expand Up @@ -175,13 +184,18 @@ def flush(self) -> str:
else:
boundary_index = len(last_tokens)
return last_tokens[:boundary_index]
elif isinstance(settings.adapter, XMLAdapter):
boundary_index = last_tokens.find(f"</{self.signature_field_name}>")
if boundary_index == -1:
boundary_index = len(last_tokens)
return last_tokens[:boundary_index]
elif isinstance(settings.adapter, ChatAdapter) or settings.adapter is None:
boundary_index = last_tokens.find("[[")
return last_tokens[:boundary_index]
else:
raise ValueError(
f"Unsupported adapter for streaming: {settings.adapter}, please use either ChatAdapter or "
"JSONAdapter for streaming purposes."
f"Unsupported adapter for streaming: {settings.adapter}, please use one of the following adapters: "
f"{', '.join([a.__name__ for a in ADAPTER_SUPPORT_STREAMING])}"
)


Expand Down
79 changes: 77 additions & 2 deletions tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ async def gpt_4o_mini_stream_2():
async def completion_side_effect(*args, **kwargs):
return stream_generators.pop(0)() # return new async generator instance

with mock.patch("litellm.acompletion", side_effect=completion_side_effect) as mock_completion:
with mock.patch("litellm.acompletion", side_effect=completion_side_effect):
program = dspy.streamify(
MyProgram(),
stream_listeners=[
Expand Down Expand Up @@ -484,7 +484,7 @@ async def gpt_4o_mini_stream_2(*args, **kwargs):

with mock.patch(
"litellm.acompletion", new_callable=AsyncMock, side_effect=[gpt_4o_mini_stream_1(), gpt_4o_mini_stream_2()]
) as mock_completion:
):
program = dspy.streamify(
MyProgram(),
stream_listeners=[
Expand Down Expand Up @@ -763,3 +763,78 @@ async def completion_side_effect(*args, **kwargs):
concat_message = "".join([chunk.chunk for chunk in all_chunks])
# The listener functions twice.
assert concat_message == "To get to the other side!To get to the other side!"

@pytest.mark.anyio
async def test_stream_listener_returns_correct_chunk_xml_adapter():
class MyProgram(dspy.Module):
def __init__(self):
super().__init__()
self.predict1 = dspy.Predict("question->answer")
self.predict2 = dspy.Predict("question,answer->judgement")

def forward(self, question, **kwargs):
answer = self.predict1(question=question, **kwargs).answer
judgement = self.predict2(question=question, answer=answer, **kwargs)
return judgement

async def xml_stream_1(*args, **kwargs):
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="<"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="answer"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=">"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="To"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" get"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" to"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" the"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" other"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" side"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="!"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="<"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="/answer"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=">"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="<"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="completed"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=">"))])

async def xml_stream_2(*args, **kwargs):
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="<"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="judgement"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=">"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="The"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" answer"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" is"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" humorous"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="."))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="<"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="/judgement"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=">"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="<"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="completed"))])
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=">"))])

stream_generators = [xml_stream_1, xml_stream_2]

async def completion_side_effect(*args, **kwargs):
return stream_generators.pop(0)()

with mock.patch("litellm.acompletion", side_effect=completion_side_effect):
program = dspy.streamify(
MyProgram(),
stream_listeners=[
dspy.streaming.StreamListener(signature_field_name="answer"),
dspy.streaming.StreamListener(signature_field_name="judgement"),
],
)
with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.XMLAdapter()):
output = program(question="why did a chicken cross the kitchen?")
all_chunks = []
async for value in output:
if isinstance(value, dspy.streaming.StreamResponse):
all_chunks.append(value)

assert all_chunks[0].predict_name == "predict1"
assert all_chunks[0].signature_field_name == "answer"
assert all_chunks[0].chunk == "To get to the other side!"

assert all_chunks[1].predict_name == "predict2"
assert all_chunks[1].signature_field_name == "judgement"
assert all_chunks[1].chunk == "The answer is humorous."