Skip to content

Commit b5390e9

Browse files
Fix missing space in the stream listener (#8360)
1 parent e3b3092 commit b5390e9

File tree

2 files changed

+34
-3
lines changed

2 files changed

+34
-3
lines changed

dspy/streaming/streaming_listener.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def receive(self, chunk: ModelResponseStream):
9999
# We keep appending the tokens to the queue until we have a full identifier or the concanated
100100
# tokens no longer match our expected identifier.
101101
self.field_start_queue.append(chunk_message)
102-
concat_message = "".join(self.field_start_queue).strip()
102+
concat_message = "".join(self.field_start_queue)
103103

104104
if start_identifier in concat_message:
105105
# We have a full identifier, we can start the stream.
@@ -113,8 +113,9 @@ def receive(self, chunk: ModelResponseStream):
113113
# because there could be a few splitters between ':' and '"', e.g., '"name": "value"'.
114114
chunk_message = chunk_message[1:]
115115

116-
elif self._buffered_message_end_with_start_identifier(concat_message, start_identifier):
117-
# If the buffered message ends with part of the start_identifier, we can start the stream.
116+
elif self._buffered_message_end_with_start_identifier(concat_message.strip(), start_identifier):
117+
# If the buffered message ends with part of the start_identifier, we keep looking for the
118+
# start_identifier from the token stream.
118119
return
119120
else:
120121
# Doesn't match the expected identifier, reset the queue.

tests/streaming/test_streaming.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,36 @@ def __call__(self, x: str, **kwargs):
229229
assert all_chunks[-1].signature_field_name == "judgement"
230230

231231

232+
@pytest.mark.anyio
233+
async def test_streaming_handles_space_correctly():
234+
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.ChatAdapter())
235+
my_program = dspy.Predict("question->answer")
236+
program = dspy.streamify(
237+
my_program, stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")]
238+
)
239+
240+
async def gpt_4o_mini_stream(*args, **kwargs):
241+
yield ModelResponseStream(
242+
model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="[[ ## answer ## ]]\n"))]
243+
)
244+
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="How "))])
245+
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="are "))])
246+
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="you "))])
247+
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="doing?"))])
248+
yield ModelResponseStream(
249+
model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="\n\n[[ ## completed ## ]]"))]
250+
)
251+
252+
with mock.patch("litellm.acompletion", side_effect=gpt_4o_mini_stream):
253+
output = program(question="What is the capital of France?")
254+
all_chunks = []
255+
async for value in output:
256+
if isinstance(value, dspy.streaming.StreamResponse):
257+
all_chunks.append(value)
258+
259+
assert all_chunks[0].chunk == "How are you doing?"
260+
261+
232262
@pytest.mark.skipif(not os.getenv("OPENAI_API_KEY"), reason="OpenAI API key not found in environment variables")
233263
def test_sync_streaming():
234264
class MyProgram(dspy.Module):

0 commit comments

Comments
 (0)