Skip to content

Commit 777b79e

Browse files
Bragolgirithtzolov
authored andcommitted
Fix RedisVectorStore not closing Jedis pipelines
1 parent 4209d5a commit 777b79e

File tree

1 file changed

+31
-29
lines changed

1 file changed

+31
-29
lines changed

vector-stores/spring-ai-redis/src/main/java/org/springframework/ai/vectorstore/RedisVectorStore.java

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -303,25 +303,26 @@ public JedisPooled getJedis() {
303303

304304
@Override
305305
public void add(List<Document> documents) {
306-
Pipeline pipeline = this.jedis.pipelined();
307-
for (Document document : documents) {
308-
var embedding = this.embeddingClient.embed(document);
309-
document.setEmbedding(embedding);
310-
311-
var fields = new HashMap<String, Object>();
312-
fields.put(this.config.embeddingFieldName, embedding);
313-
fields.put(this.config.contentFieldName, document.getContent());
314-
fields.putAll(document.getMetadata());
315-
pipeline.jsonSetWithEscape(key(document.getId()), JSON_SET_PATH, fields);
316-
}
317-
List<Object> responses = pipeline.syncAndReturnAll();
318-
Optional<Object> errResponse = responses.stream().filter(Predicate.not(RESPONSE_OK)).findAny();
319-
if (errResponse.isPresent()) {
320-
String message = MessageFormat.format("Could not add document: {0}", errResponse.get());
321-
if (logger.isErrorEnabled()) {
322-
logger.error(message);
306+
try (Pipeline pipeline = this.jedis.pipelined()) {
307+
for (Document document : documents) {
308+
var embedding = this.embeddingClient.embed(document);
309+
document.setEmbedding(embedding);
310+
311+
var fields = new HashMap<String, Object>();
312+
fields.put(this.config.embeddingFieldName, embedding);
313+
fields.put(this.config.contentFieldName, document.getContent());
314+
fields.putAll(document.getMetadata());
315+
pipeline.jsonSetWithEscape(key(document.getId()), JSON_SET_PATH, fields);
316+
}
317+
List<Object> responses = pipeline.syncAndReturnAll();
318+
Optional<Object> errResponse = responses.stream().filter(Predicate.not(RESPONSE_OK)).findAny();
319+
if (errResponse.isPresent()) {
320+
String message = MessageFormat.format("Could not add document: {0}", errResponse.get());
321+
if (logger.isErrorEnabled()) {
322+
logger.error(message);
323+
}
324+
throw new RuntimeException(message);
323325
}
324-
throw new RuntimeException(message);
325326
}
326327
}
327328

@@ -331,19 +332,20 @@ private String key(String id) {
331332

332333
@Override
333334
public Optional<Boolean> delete(List<String> idList) {
334-
Pipeline pipeline = this.jedis.pipelined();
335-
for (String id : idList) {
336-
pipeline.jsonDel(key(id));
337-
}
338-
List<Object> responses = pipeline.syncAndReturnAll();
339-
Optional<Object> errResponse = responses.stream().filter(Predicate.not(RESPONSE_DEL_OK)).findAny();
340-
if (errResponse.isPresent()) {
341-
if (logger.isErrorEnabled()) {
342-
logger.error("Could not delete document: {}", errResponse.get());
335+
try (Pipeline pipeline = this.jedis.pipelined()) {
336+
for (String id : idList) {
337+
pipeline.jsonDel(key(id));
338+
}
339+
List<Object> responses = pipeline.syncAndReturnAll();
340+
Optional<Object> errResponse = responses.stream().filter(Predicate.not(RESPONSE_DEL_OK)).findAny();
341+
if (errResponse.isPresent()) {
342+
if (logger.isErrorEnabled()) {
343+
logger.error("Could not delete document: {}", errResponse.get());
344+
}
345+
return Optional.of(false);
343346
}
344-
return Optional.of(false);
347+
return Optional.of(true);
345348
}
346-
return Optional.of(true);
347349
}
348350

349351
@Override

0 commit comments

Comments
 (0)