Skip to content

Commit 9392485

Browse files
committed
Fix VectorStoreChatMemoryAdvisor streaming bug
- Override adviseStream method in VectorStoreChatMemoryAdvisor to properly handle streaming responses - Add tests to verify the fix works with both normal and problematic streaming scenarios Fixes #3152 Signed-off-by: Mark Pollack <mark.pollack@broadcom.com>
1 parent 20ccd1f commit 9392485

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

advisors/spring-ai-advisors-vector-store/src/main/java/org/springframework/ai/chat/client/advisor/vectorstore/VectorStoreChatMemoryAdvisor.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,18 @@
2121
import java.util.List;
2222
import java.util.Map;
2323

24+
import reactor.core.publisher.Flux;
25+
import reactor.core.publisher.Mono;
2426
import reactor.core.scheduler.Scheduler;
2527

28+
import org.springframework.ai.chat.client.ChatClientMessageAggregator;
2629
import org.springframework.ai.chat.client.ChatClientRequest;
2730
import org.springframework.ai.chat.client.ChatClientResponse;
2831
import org.springframework.ai.chat.client.advisor.api.Advisor;
2932
import org.springframework.ai.chat.client.advisor.api.AdvisorChain;
3033
import org.springframework.ai.chat.client.advisor.api.BaseAdvisor;
3134
import org.springframework.ai.chat.client.advisor.api.BaseChatMemoryAdvisor;
35+
import org.springframework.ai.chat.client.advisor.api.StreamAdvisorChain;
3236
import org.springframework.ai.chat.memory.ChatMemory;
3337
import org.springframework.ai.chat.messages.AssistantMessage;
3438
import org.springframework.ai.chat.messages.Message;
@@ -167,6 +171,20 @@ public ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorCh
167171
return chatClientResponse;
168172
}
169173

174+
@Override
175+
public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest,
176+
StreamAdvisorChain streamAdvisorChain) {
177+
// Get the scheduler from BaseAdvisor
178+
Scheduler scheduler = this.getScheduler();
179+
// Process the request with the before method
180+
return Mono.just(chatClientRequest)
181+
.publishOn(scheduler)
182+
.map(request -> this.before(request, streamAdvisorChain))
183+
.flatMapMany(streamAdvisorChain::nextStream)
184+
.transform(flux -> new ChatClientMessageAggregator().aggregateChatClientResponse(flux,
185+
response -> this.after(response, streamAdvisorChain)));
186+
}
187+
170188
private List<Document> toDocuments(List<Message> messages, String conversationId) {
171189
List<Document> docs = messages.stream()
172190
.filter(m -> m.getMessageType() == MessageType.USER || m.getMessageType() == MessageType.ASSISTANT)

vector-stores/spring-ai-pgvector-store/src/test/java/org/springframework/ai/vectorstore/pgvector/PgVectorStoreWithChatMemoryAdvisorIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.springframework.jdbc.core.JdbcTemplate;
4848

4949
import static org.assertj.core.api.Assertions.assertThat;
50+
import static org.assertj.core.api.Assertions.fail;
5051
import static org.mockito.ArgumentMatchers.any;
5152
import static org.mockito.BDDMockito.given;
5253
import static org.mockito.Mockito.mock;

0 commit comments

Comments
 (0)