Skip to content

Add token streaming support for XMLAdapter #8478

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

chenmoneygithub
Copy link
Collaborator

@chenmoneygithub chenmoneygithub commented Jul 1, 2025

Support XMLAdapter in StreamListener

Note

This PR is generated by OpenAI codex, I am testing how deep it can understand DSPy codebase with this specific use case.

It passes on my tests:

import asyncio

import dspy

# lm = dspy.LM("gemini/gemini-2.5-flash-preview-04-17", cache=False)
lm = dspy.LM("databricks/databricks-claude-3-7-sonnet", cache=False)
# lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm, adapter=dspy.XMLAdapter())


class MyModule(dspy.Module):
    def __init__(self):
        super().__init__()

        self.predict1 = dspy.Predict("question->answer")
        self.predict2 = dspy.Predict("question, answer->judgement, score")

    def forward(self, question: str, **kwargs):
        answer = self.predict1(question=question)
        judgement = self.predict2(question=question, answer=answer)
        return judgement


predict = MyModule()
stream_listeners = [
    dspy.streaming.StreamListener(signature_field_name="answer"),
    dspy.streaming.StreamListener(signature_field_name="judgement"),
]
stream_predict = dspy.streamify(
    predict,
    stream_listeners=stream_listeners,
    include_final_prediction_in_output_stream=False,
)


async def main():
    with dspy.settings.context(adapter=dspy.XMLAdapter()):
        output = stream_predict(question="why did a chicken cross the kitchen?")

        return_value = None
        async for value in output:
            if isinstance(value, dspy.streaming.StatusMessage):
                print(value.message)
            elif isinstance(value, dspy.Prediction):
                print(value)
            elif isinstance(value, dspy.streaming.StreamResponse):
                print(value)
            else:
                print(value.choices[0].delta.content)
    return return_value


import time

start = time.time()
asyncio.run(main())
end = time.time()


print(f"Time taken: {end - start} seconds")

dspy.inspect_history()

@chenmoneygithub chenmoneygithub changed the title Add token streaming support for XMLAdapter [WIP] Add token streaming support for XMLAdapter Jul 1, 2025
@chenmoneygithub chenmoneygithub marked this pull request as draft July 1, 2025 23:22
@chenmoneygithub chenmoneygithub marked this pull request as ready for review July 3, 2025 21:05
@chenmoneygithub chenmoneygithub changed the title [WIP] Add token streaming support for XMLAdapter Add token streaming support for XMLAdapter Jul 3, 2025
start_identifier = self.xml_adapter_start_identifier
end_identifier = self.xml_adapter_end_identifier

start_indicator = "<"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also make start_indicator an instance variable like we do for start_identifier and end_identifier?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned above, maybe it's better to move the logic into each adapter class and remove if-else branch

start_identifier = settings.adapter.get_start_identifier(self.signature_field_name)
end_identifier= settings.adapter.get_end_identifier(self.signature_field_name)
start_indicator= settings.adapter.get_start_indicator()

elif isinstance(settings.adapter, ChatAdapter) or settings.adapter is None:
start_identifier = self.chat_adapter_start_identifier
end_identifier = self.chat_adapter_end_identifier

start_indicator = "["
else:
raise ValueError(
f"Unsupported adapter for streaming: {settings.adapter}, please use either ChatAdapter or "
f"Unsupported adapter for streaming: {settings.adapter}, please use either ChatAdapter, XMLAdapter or "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we define const for a list of adapters that support streaming?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg!

@@ -51,6 +52,9 @@ def __init__(
self.chat_adapter_start_identifier = f"[[ ## {self.signature_field_name} ## ]]"
self.chat_adapter_end_identifier = re.compile(r"\[\[ ## (\w+) ## \]\]")

self.xml_adapter_start_identifier = f"<{self.signature_field_name}>"
self.xml_adapter_end_identifier = re.compile(rf"</{self.signature_field_name}>")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have three adapters, shall we define a mapping for adapter-specific parameters? Alternatively, we can move the identifier into each adapter class for better encapsulation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we define a mapping for adapter-specific parameters?
Yes, good call!

we can move the identifier into each adapter class for better encapsulation
This is a little subtle, the start_identifier/end_identifier is not a feature of the adapter, but a feature of the stream listener. The boundary is a bit vague though - adapter can fully function without start_identifier/end_identifier, and different listeners could introduce different start_identifier/end_identifier for the same adapter, so it belongs more to the stream listeners.

elif isinstance(settings.adapter, ChatAdapter) or settings.adapter is None:
boundary_index = last_tokens.find("[[")
return last_tokens[:boundary_index]
boundary_index = last_tokens.find(f"</{self.signature_field_name}>")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: why do we need this change for ChatAdapter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, it's a codex mess

else:
raise ValueError(
f"Unsupported adapter for streaming: {settings.adapter}, please use either ChatAdapter or "
f"Unsupported adapter for streaming: {settings.adapter}, please use either ChatAdapter, XMLAdapter or "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

Copy link
Collaborator

@TomeHirata TomeHirata left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants