Skip to content

Commit 6651510

Browse files
KludexclaudeDouweM
authored
feat: add history_processors parameter to Agent for message processing (#1970)
Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Douwe Maan <douwe@pydantic.dev>
1 parent c33fe23 commit 6651510

File tree

4 files changed

+411
-13
lines changed

4 files changed

+411
-13
lines changed

docs/message-history.md

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,178 @@ print(result2.all_messages())
322322
"""
323323
```
324324

325+
## Processing Message History
326+
327+
Sometimes you may want to modify the message history before it's sent to the model. This could be for privacy
328+
reasons (filtering out sensitive information), to save costs on tokens, to give less context to the LLM, or
329+
custom processing logic.
330+
331+
PydanticAI provides a `history_processors` parameter on `Agent` that allows you to intercept and modify
332+
the message history before each model request.
333+
334+
### Usage
335+
336+
The `history_processors` is a list of callables that take a list of
337+
[`ModelMessage`][pydantic_ai.messages.ModelMessage] and return a modified list of the same type.
338+
339+
Each processor is applied in sequence, and processors can be either synchronous or asynchronous.
340+
341+
```python {title="simple_history_processor.py"}
342+
from pydantic_ai import Agent
343+
from pydantic_ai.messages import (
344+
ModelMessage,
345+
ModelRequest,
346+
ModelResponse,
347+
TextPart,
348+
UserPromptPart,
349+
)
350+
351+
352+
def filter_responses(messages: list[ModelMessage]) -> list[ModelMessage]:
353+
"""Remove all ModelResponse messages, keeping only ModelRequest messages."""
354+
return [msg for msg in messages if isinstance(msg, ModelRequest)]
355+
356+
# Create agent with history processor
357+
agent = Agent('openai:gpt-4o', history_processors=[filter_responses])
358+
359+
# Example: Create some conversation history
360+
message_history = [
361+
ModelRequest(parts=[UserPromptPart(content='What is 2+2?')]),
362+
ModelResponse(parts=[TextPart(content='2+2 equals 4')]), # This will be filtered out
363+
]
364+
365+
# When you run the agent, the history processor will filter out ModelResponse messages
366+
# result = agent.run_sync('What about 3+3?', message_history=message_history)
367+
```
368+
369+
#### Keep Only Recent Messages
370+
371+
You can use the `history_processor` to only keep the recent messages:
372+
373+
```python {title="keep_recent_messages.py"}
374+
from pydantic_ai import Agent
375+
from pydantic_ai.messages import ModelMessage
376+
377+
378+
async def keep_recent_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
379+
"""Keep only the last 5 messages to manage token usage."""
380+
return messages[-5:] if len(messages) > 5 else messages
381+
382+
agent = Agent('openai:gpt-4o', history_processors=[keep_recent_messages])
383+
384+
# Example: Even with a long conversation history, only the last 5 messages are sent to the model
385+
long_conversation_history: list[ModelMessage] = [] # Your long conversation history here
386+
# result = agent.run_sync('What did we discuss?', message_history=long_conversation_history)
387+
```
388+
389+
#### Summarize Old Messages
390+
391+
Use an LLM to summarize older messages to preserve context while reducing tokens.
392+
393+
```python {title="summarize_old_messages.py"}
394+
from pydantic_ai import Agent
395+
from pydantic_ai.messages import ModelMessage
396+
397+
# Use a cheaper model to summarize old messages.
398+
summarize_agent = Agent(
399+
'openai:gpt-4o-mini',
400+
instructions="""
401+
Summarize this conversation, omitting small talk and unrelated topics.
402+
Focus on the technical discussion and next steps.
403+
""",
404+
)
405+
406+
407+
async def summarize_old_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
408+
# Summarize the oldest 10 messages
409+
if len(messages) > 10:
410+
oldest_messages = messages[:10]
411+
summary = await summarize_agent.run(message_history=oldest_messages)
412+
# Return the last message and the summary
413+
return summary.new_messages() + messages[-1:]
414+
415+
return messages
416+
417+
418+
agent = Agent('openai:gpt-4o', history_processors=[summarize_old_messages])
419+
```
420+
421+
### Testing History Processors
422+
423+
You can test what messages are actually sent to the model provider using
424+
[`FunctionModel`][pydantic_ai.models.function.FunctionModel]:
425+
426+
```python {title="test_history_processor.py"}
427+
import pytest
428+
429+
from pydantic_ai import Agent
430+
from pydantic_ai.messages import (
431+
ModelMessage,
432+
ModelRequest,
433+
ModelResponse,
434+
TextPart,
435+
UserPromptPart,
436+
)
437+
from pydantic_ai.models.function import AgentInfo, FunctionModel
438+
439+
440+
@pytest.fixture
441+
def received_messages() -> list[ModelMessage]:
442+
return []
443+
444+
445+
@pytest.fixture
446+
def function_model(received_messages: list[ModelMessage]) -> FunctionModel:
447+
def capture_model_function(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse:
448+
# Capture the messages that the provider actually receives
449+
received_messages.clear()
450+
received_messages.extend(messages)
451+
return ModelResponse(parts=[TextPart(content='Provider response')])
452+
453+
return FunctionModel(capture_model_function)
454+
455+
456+
def test_history_processor(function_model: FunctionModel, received_messages: list[ModelMessage]):
457+
def filter_responses(messages: list[ModelMessage]) -> list[ModelMessage]:
458+
return [msg for msg in messages if isinstance(msg, ModelRequest)]
459+
460+
agent = Agent(function_model, history_processors=[filter_responses])
461+
462+
message_history = [
463+
ModelRequest(parts=[UserPromptPart(content='Question 1')]),
464+
ModelResponse(parts=[TextPart(content='Answer 1')]),
465+
]
466+
467+
agent.run_sync('Question 2', message_history=message_history)
468+
assert received_messages == [
469+
ModelRequest(parts=[UserPromptPart(content='Question 1')]),
470+
ModelRequest(parts=[UserPromptPart(content='Question 2')]),
471+
]
472+
```
473+
474+
### Multiple Processors
475+
476+
You can also use multiple processors:
477+
478+
```python {title="multiple_history_processors.py"}
479+
from pydantic_ai import Agent
480+
from pydantic_ai.messages import ModelMessage, ModelRequest
481+
482+
483+
def filter_responses(messages: list[ModelMessage]) -> list[ModelMessage]:
484+
return [msg for msg in messages if isinstance(msg, ModelRequest)]
485+
486+
487+
def summarize_old_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
488+
return messages[-5:]
489+
490+
491+
agent = Agent('openai:gpt-4o', history_processors=[filter_responses, summarize_old_messages])
492+
```
493+
494+
In this case, the `filter_responses` processor will be applied first, and the
495+
`summarize_old_messages` processor will be applied second.
496+
325497
## Examples
326498

327499
For a more complete example of using messages in conversations, see the [chat app](examples/chat-app.md) example.

pydantic_ai_slim/pydantic_ai/_agent_graph.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,11 @@
1212
from opentelemetry.trace import Tracer
1313
from typing_extensions import TypeGuard, TypeVar, assert_never
1414

15+
from pydantic_ai._utils import is_async_callable, run_in_executor
1516
from pydantic_graph import BaseNode, Graph, GraphRunContext
1617
from pydantic_graph.nodes import End, NodeRunEndT
1718

18-
from . import (
19-
_output,
20-
_system_prompt,
21-
exceptions,
22-
messages as _messages,
23-
models,
24-
result,
25-
usage as _usage,
26-
)
19+
from . import _output, _system_prompt, exceptions, messages as _messages, models, result, usage as _usage
2720
from .result import OutputDataT
2821
from .settings import ModelSettings, merge_model_settings
2922
from .tools import RunContext, Tool, ToolDefinition, ToolsPrepareFunc
@@ -39,6 +32,7 @@
3932
'CallToolsNode',
4033
'build_run_context',
4134
'capture_run_messages',
35+
'HistoryProcessor',
4236
)
4337

4438

@@ -54,6 +48,11 @@
5448
DepsT = TypeVar('DepsT')
5549
OutputT = TypeVar('OutputT')
5650

51+
_HistoryProcessorSync = Callable[[list[_messages.ModelMessage]], list[_messages.ModelMessage]]
52+
_HistoryProcessorAsync = Callable[[list[_messages.ModelMessage]], Awaitable[list[_messages.ModelMessage]]]
53+
HistoryProcessor = Union[_HistoryProcessorSync, _HistoryProcessorAsync]
54+
"""A function that processes a list of model messages and returns a list of model messages."""
55+
5756

5857
@dataclasses.dataclass
5958
class GraphAgentState:
@@ -93,6 +92,8 @@ class GraphAgentDeps(Generic[DepsT, OutputDataT]):
9392
output_schema: _output.OutputSchema[OutputDataT] | None
9493
output_validators: list[_output.OutputValidator[DepsT, OutputDataT]]
9594

95+
history_processors: Sequence[HistoryProcessor]
96+
9697
function_tools: dict[str, Tool[DepsT]] = dataclasses.field(repr=False)
9798
mcp_servers: Sequence[MCPServer] = dataclasses.field(repr=False)
9899
default_retries: int
@@ -327,8 +328,9 @@ async def _stream(
327328

328329
model_settings, model_request_parameters = await self._prepare_request(ctx)
329330
model_request_parameters = ctx.deps.model.customize_request_parameters(model_request_parameters)
331+
message_history = await _process_message_history(ctx.state.message_history, ctx.deps.history_processors)
330332
async with ctx.deps.model.request_stream(
331-
ctx.state.message_history, model_settings, model_request_parameters
333+
message_history, model_settings, model_request_parameters
332334
) as streamed_response:
333335
self._did_stream = True
334336
ctx.state.usage.requests += 1
@@ -350,9 +352,8 @@ async def _make_request(
350352

351353
model_settings, model_request_parameters = await self._prepare_request(ctx)
352354
model_request_parameters = ctx.deps.model.customize_request_parameters(model_request_parameters)
353-
model_response = await ctx.deps.model.request(
354-
ctx.state.message_history, model_settings, model_request_parameters
355-
)
355+
message_history = await _process_message_history(ctx.state.message_history, ctx.deps.history_processors)
356+
model_response = await ctx.deps.model.request(message_history, model_settings, model_request_parameters)
356357
ctx.state.usage.incr(_usage.Usage())
357358

358359
return self._finish_handling(ctx, model_response)
@@ -865,3 +866,16 @@ def build_agent_graph(
865866
auto_instrument=False,
866867
)
867868
return graph
869+
870+
871+
async def _process_message_history(
872+
messages: list[_messages.ModelMessage],
873+
processors: Sequence[HistoryProcessor],
874+
) -> list[_messages.ModelMessage]:
875+
"""Process message history through a sequence of processors."""
876+
for processor in processors:
877+
if is_async_callable(processor):
878+
messages = await processor(messages)
879+
else:
880+
messages = await run_in_executor(processor, messages)
881+
return messages

pydantic_ai_slim/pydantic_ai/agent.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
result,
2929
usage as _usage,
3030
)
31+
from ._agent_graph import HistoryProcessor
3132
from .models.instrumented import InstrumentationSettings, InstrumentedModel, instrument_model
3233
from .result import FinalResult, OutputDataT, StreamedRunResult
3334
from .settings import ModelSettings, merge_model_settings
@@ -179,6 +180,7 @@ def __init__(
179180
defer_model_check: bool = False,
180181
end_strategy: EndStrategy = 'early',
181182
instrument: InstrumentationSettings | bool | None = None,
183+
history_processors: Sequence[HistoryProcessor] | None = None,
182184
) -> None: ...
183185

184186
@overload
@@ -208,6 +210,7 @@ def __init__(
208210
defer_model_check: bool = False,
209211
end_strategy: EndStrategy = 'early',
210212
instrument: InstrumentationSettings | bool | None = None,
213+
history_processors: Sequence[HistoryProcessor] | None = None,
211214
) -> None: ...
212215

213216
def __init__(
@@ -232,6 +235,7 @@ def __init__(
232235
defer_model_check: bool = False,
233236
end_strategy: EndStrategy = 'early',
234237
instrument: InstrumentationSettings | bool | None = None,
238+
history_processors: Sequence[HistoryProcessor] | None = None,
235239
**_deprecated_kwargs: Any,
236240
):
237241
"""Create an agent.
@@ -275,6 +279,9 @@ def __init__(
275279
[`Agent.instrument_all()`][pydantic_ai.Agent.instrument_all]
276280
will be used, which defaults to False.
277281
See the [Debugging and Monitoring guide](https://ai.pydantic.dev/logfire/) for more info.
282+
history_processors: Optional list of callables to process the message history before sending it to the model.
283+
Each processor takes a list of messages and returns a modified list of messages.
284+
Processors can be sync or async and are applied in sequence.
278285
"""
279286
if model is None or defer_model_check:
280287
self.model = model
@@ -343,6 +350,7 @@ def __init__(
343350
self._max_result_retries = output_retries if output_retries is not None else retries
344351
self._mcp_servers = mcp_servers
345352
self._prepare_tools = prepare_tools
353+
self.history_processors = history_processors or []
346354
for tool in tools:
347355
if isinstance(tool, Tool):
348356
self._register_tool(tool)
@@ -690,6 +698,7 @@ async def get_instructions(run_context: RunContext[AgentDepsT]) -> str | None:
690698
end_strategy=self.end_strategy,
691699
output_schema=output_schema,
692700
output_validators=output_validators,
701+
history_processors=self.history_processors,
693702
function_tools=run_function_tools,
694703
mcp_servers=self._mcp_servers,
695704
default_retries=self._default_retries,

0 commit comments

Comments
 (0)