diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index ea4930f894..9998a42cb5 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -48,8 +48,7 @@ protected RedisProtocol getProtocol() { } protected volatile CommandKeyArgumentPreProcessor keyPreProcessor = null; - private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null; - private Lock mapperLock = new ReentrantLock(true); + private Lock mapperLock = new ReentrantLock(true); private volatile JsonObjectMapper jsonObjectMapper; private final AtomicInteger searchDialect = new AtomicInteger(2); // DEFAULT_SEARCH_DIALECT = 2; @@ -58,10 +57,6 @@ void setKeyArgumentPreProcessor(CommandKeyArgumentPreProcessor keyPreProcessor) this.keyPreProcessor = keyPreProcessor; } - void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig config) { - this.broadcastAndRoundRobinConfig = config; - } - protected CommandArguments commandArguments(ProtocolCommand command) { CommandArguments comArgs = new CommandArguments(command); if (keyPreProcessor != null) comArgs.setKeyArgumentPreProcessor(keyPreProcessor); @@ -2941,7 +2936,7 @@ public final CommandObject> xreadGroup(byte[] groupName, byte[] con } public final CommandObject>>> xreadGroupBinary( - byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, + byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { CommandArguments args = commandArguments(XREADGROUP) .add(GROUP).add(groupName).add(consumer) @@ -2956,7 +2951,7 @@ public final CommandObject>>> xre } public final CommandObject>> xreadGroupBinaryAsMap( - byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, + byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { CommandArguments args = commandArguments(XREADGROUP) .add(GROUP).add(groupName).add(consumer) @@ -3445,18 +3440,16 @@ public final CommandObject hsetObject(String key, Map hash return new CommandObject<>(addFlatMapArgs(commandArguments(HSET).key(key), hash), BuilderFactory.LONG); } - private boolean isRoundRobinSearchCommand() { - if (broadcastAndRoundRobinConfig == null) { - return true; - } else if (broadcastAndRoundRobinConfig.getRediSearchModeInCluster() == JedisBroadcastAndRoundRobinConfig.RediSearchMode.LIGHT) { - return false; - } - return true; + private boolean isRoundRobinSearchCommand(SearchCommand sc) { + + return !(sc.equals(SearchCommand.SUGGET) || sc.equals(SearchCommand.SUGADD) || sc.equals( + SearchCommand.SUGLEN) || sc.equals(SearchCommand.SUGDEL) || sc.equals( + SearchCommand.CURSOR)); } private CommandArguments checkAndRoundRobinSearchCommand(SearchCommand sc, String idx) { CommandArguments ca = commandArguments(sc); - if (isRoundRobinSearchCommand()) { + if (isRoundRobinSearchCommand(sc)) { ca.add(idx); } else { ca.key(idx); @@ -3464,9 +3457,10 @@ private CommandArguments checkAndRoundRobinSearchCommand(SearchCommand sc, Strin return ca; } - private CommandArguments checkAndRoundRobinSearchCommand(SearchCommand sc, String idx1, String idx2) { + private CommandArguments checkAndRoundRobinSearchCommand(SearchCommand sc, String idx1, + String idx2) { CommandArguments ca = commandArguments(sc); - if (isRoundRobinSearchCommand()) { + if (isRoundRobinSearchCommand(sc)) { ca.add(idx1).add(idx2); } else { ca.key(idx1).key(idx2); @@ -3474,8 +3468,14 @@ private CommandArguments checkAndRoundRobinSearchCommand(SearchCommand sc, Strin return ca; } - private CommandArguments checkAndRoundRobinSearchCommand(CommandArguments commandArguments, byte[] indexName) { - return isRoundRobinSearchCommand() ? commandArguments.add(indexName) : commandArguments.key(indexName); + private CommandArguments checkAndRoundRobinSearchCommand(SearchCommand sc, byte[] indexName) { + CommandArguments ca = commandArguments(sc); + if (isRoundRobinSearchCommand(sc)) { + ca.add(indexName); + } else { + ca.key(indexName); + } + return ca; } private CommandObject directSearchCommand(CommandObject object, String indexName) { @@ -3556,7 +3556,7 @@ public final CommandObject ftSearch(byte[] indexName, Query query) if (protocol == RedisProtocol.RESP3) { throw new UnsupportedOperationException("binary ft.search is not implemented with resp3."); } - return new CommandObject<>(checkAndRoundRobinSearchCommand(commandArguments(SearchCommand.SEARCH), indexName) + return new CommandObject<>(checkAndRoundRobinSearchCommand(SearchCommand.SEARCH, indexName) .addParams(query.dialectOptional(searchDialect.get())), getSearchResultBuilder(null, () -> new SearchResultBuilder(!query.getNoContent(), query.getWithScores(), false))); } diff --git a/src/main/java/redis/clients/jedis/JedisBroadcastAndRoundRobinConfig.java b/src/main/java/redis/clients/jedis/JedisBroadcastAndRoundRobinConfig.java deleted file mode 100644 index 50b5b6efc0..0000000000 --- a/src/main/java/redis/clients/jedis/JedisBroadcastAndRoundRobinConfig.java +++ /dev/null @@ -1,10 +0,0 @@ -package redis.clients.jedis; - -public interface JedisBroadcastAndRoundRobinConfig { - - public enum RediSearchMode { - DEFAULT, LIGHT; - } - - RediSearchMode getRediSearchModeInCluster(); -} diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index e3960862fa..2bbec5f5c7 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -36,6 +36,7 @@ import redis.clients.jedis.providers.*; import redis.clients.jedis.resps.*; import redis.clients.jedis.search.*; +import redis.clients.jedis.search.aggr.AggregateIterator; import redis.clients.jedis.search.aggr.AggregationBuilder; import redis.clients.jedis.search.aggr.AggregationResult; import redis.clients.jedis.search.aggr.FtAggregateIteration; @@ -54,7 +55,6 @@ public class UnifiedJedis implements JedisCommands, JedisBinaryCommands, protected final ConnectionProvider provider; protected final CommandExecutor executor; protected final CommandObjects commandObjects; - private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null; private final Cache cache; public UnifiedJedis() { @@ -311,26 +311,12 @@ public final T executeCommand(CommandObject commandObject) { return executor.executeCommand(commandObject); } - public final T broadcastCommand(CommandObject commandObject) { - return executor.broadcastCommand(commandObject); - } - - private T checkAndBroadcastCommand(CommandObject commandObject) { - boolean broadcast = true; - - if (broadcastAndRoundRobinConfig == null) { - } else if (commandObject.getArguments().getCommand() instanceof SearchProtocol.SearchCommand - && broadcastAndRoundRobinConfig - .getRediSearchModeInCluster() == JedisBroadcastAndRoundRobinConfig.RediSearchMode.LIGHT) { - broadcast = false; - } - - return broadcast ? broadcastCommand(commandObject) : executeCommand(commandObject); + public final T executeKeylessCommand(CommandObject commandObject) { + return executor.executeKeylessCommand(commandObject); } - public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig config) { - this.broadcastAndRoundRobinConfig = config; - this.commandObjects.setBroadcastAndRoundRobinConfig(this.broadcastAndRoundRobinConfig); + public final T broadcastCommand(CommandObject commandObject) { + return executor.broadcastCommand(commandObject); } public Cache getCache() { @@ -338,19 +324,19 @@ public Cache getCache() { } public String ping() { - return checkAndBroadcastCommand(commandObjects.ping()); + return broadcastCommand(commandObjects.ping()); } public String flushDB() { - return checkAndBroadcastCommand(commandObjects.flushDB()); + return broadcastCommand(commandObjects.flushDB()); } public String flushAll() { - return checkAndBroadcastCommand(commandObjects.flushAll()); + return broadcastCommand(commandObjects.flushAll()); } public String configSet(String parameter, String value) { - return checkAndBroadcastCommand(commandObjects.configSet(parameter, value)); + return broadcastCommand(commandObjects.configSet(parameter, value)); } public String info() { @@ -3596,22 +3582,22 @@ public Object fcallReadonly(String name, List keys, List args) { @Override public String functionDelete(String libraryName) { - return checkAndBroadcastCommand(commandObjects.functionDelete(libraryName)); + return broadcastCommand(commandObjects.functionDelete(libraryName)); } @Override public String functionFlush() { - return checkAndBroadcastCommand(commandObjects.functionFlush()); + return broadcastCommand(commandObjects.functionFlush()); } @Override public String functionFlush(FlushMode mode) { - return checkAndBroadcastCommand(commandObjects.functionFlush(mode)); + return broadcastCommand(commandObjects.functionFlush(mode)); } @Override public String functionKill() { - return checkAndBroadcastCommand(commandObjects.functionKill()); + return broadcastCommand(commandObjects.functionKill()); } @Override @@ -3636,12 +3622,12 @@ public List functionListWithCode(String libraryNamePattern) { @Override public String functionLoad(String functionCode) { - return checkAndBroadcastCommand(commandObjects.functionLoad(functionCode)); + return broadcastCommand(commandObjects.functionLoad(functionCode)); } @Override public String functionLoadReplace(String functionCode) { - return checkAndBroadcastCommand(commandObjects.functionLoadReplace(functionCode)); + return broadcastCommand(commandObjects.functionLoadReplace(functionCode)); } @Override @@ -3661,7 +3647,7 @@ public Object fcallReadonly(byte[] name, List keys, List args) { @Override public String functionDelete(byte[] libraryName) { - return checkAndBroadcastCommand(commandObjects.functionDelete(libraryName)); + return broadcastCommand(commandObjects.functionDelete(libraryName)); } @Override @@ -3691,22 +3677,22 @@ public List functionListWithCode(final byte[] libraryNamePattern) { @Override public String functionLoad(byte[] functionCode) { - return checkAndBroadcastCommand(commandObjects.functionLoad(functionCode)); + return broadcastCommand(commandObjects.functionLoad(functionCode)); } @Override public String functionLoadReplace(byte[] functionCode) { - return checkAndBroadcastCommand(commandObjects.functionLoadReplace(functionCode)); + return broadcastCommand(commandObjects.functionLoadReplace(functionCode)); } @Override public String functionRestore(byte[] serializedValue) { - return checkAndBroadcastCommand(commandObjects.functionRestore(serializedValue)); + return broadcastCommand(commandObjects.functionRestore(serializedValue)); } @Override public String functionRestore(byte[] serializedValue, FunctionRestorePolicy policy) { - return checkAndBroadcastCommand(commandObjects.functionRestore(serializedValue, policy)); + return broadcastCommand(commandObjects.functionRestore(serializedValue, policy)); } @Override @@ -3819,7 +3805,7 @@ public Object evalsha(byte[] sha1, byte[] sampleKey) { } public List scriptExists(List sha1s) { - return checkAndBroadcastCommand(commandObjects.scriptExists(sha1s)); + return broadcastCommand(commandObjects.scriptExists(sha1s)); } @Override @@ -3843,7 +3829,7 @@ public List scriptExists(byte[] sampleKey, byte[]... sha1s) { } public String scriptLoad(String script) { - return checkAndBroadcastCommand(commandObjects.scriptLoad(script)); + return broadcastCommand(commandObjects.scriptLoad(script)); } @Override @@ -3852,7 +3838,7 @@ public String scriptLoad(String script, String sampleKey) { } public String scriptFlush() { - return checkAndBroadcastCommand(commandObjects.scriptFlush()); + return broadcastCommand(commandObjects.scriptFlush()); } @Override @@ -3866,7 +3852,7 @@ public String scriptFlush(String sampleKey, FlushMode flushMode) { } public String scriptKill() { - return checkAndBroadcastCommand(commandObjects.scriptKill()); + return broadcastCommand(commandObjects.scriptKill()); } @Override @@ -3895,7 +3881,7 @@ public String scriptKill(byte[] sampleKey) { } public String slowlogReset() { - return checkAndBroadcastCommand(commandObjects.slowlogReset()); + return broadcastCommand(commandObjects.slowlogReset()); } // Sample key commands @@ -3944,57 +3930,57 @@ public long hsetObject(String key, Map hash) { @Override public String ftCreate(String indexName, IndexOptions indexOptions, Schema schema) { - return checkAndBroadcastCommand(commandObjects.ftCreate(indexName, indexOptions, schema)); + return executeKeylessCommand(commandObjects.ftCreate(indexName, indexOptions, schema)); } @Override public String ftCreate(String indexName, FTCreateParams createParams, Iterable schemaFields) { - return checkAndBroadcastCommand(commandObjects.ftCreate(indexName, createParams, schemaFields)); + return executeKeylessCommand(commandObjects.ftCreate(indexName, createParams, schemaFields)); } @Override public String ftAlter(String indexName, Schema schema) { - return checkAndBroadcastCommand(commandObjects.ftAlter(indexName, schema)); + return executeKeylessCommand(commandObjects.ftAlter(indexName, schema)); } @Override public String ftAlter(String indexName, Iterable schemaFields) { - return checkAndBroadcastCommand(commandObjects.ftAlter(indexName, schemaFields)); + return executeKeylessCommand(commandObjects.ftAlter(indexName, schemaFields)); } @Override public String ftAliasAdd(String aliasName, String indexName) { - return checkAndBroadcastCommand(commandObjects.ftAliasAdd(aliasName, indexName)); + return executeKeylessCommand(commandObjects.ftAliasAdd(aliasName, indexName)); } @Override public String ftAliasUpdate(String aliasName, String indexName) { - return checkAndBroadcastCommand(commandObjects.ftAliasUpdate(aliasName, indexName)); + return executeKeylessCommand(commandObjects.ftAliasUpdate(aliasName, indexName)); } @Override public String ftAliasDel(String aliasName) { - return checkAndBroadcastCommand(commandObjects.ftAliasDel(aliasName)); + return executeKeylessCommand(commandObjects.ftAliasDel(aliasName)); } @Override public String ftDropIndex(String indexName) { - return checkAndBroadcastCommand(commandObjects.ftDropIndex(indexName)); + return executeKeylessCommand(commandObjects.ftDropIndex(indexName)); } @Override public String ftDropIndexDD(String indexName) { - return checkAndBroadcastCommand(commandObjects.ftDropIndexDD(indexName)); + return executeKeylessCommand(commandObjects.ftDropIndexDD(indexName)); } @Override public SearchResult ftSearch(String indexName, String query) { - return executeCommand(commandObjects.ftSearch(indexName, query)); + return executeKeylessCommand(commandObjects.ftSearch(indexName, query)); } @Override public SearchResult ftSearch(String indexName, String query, FTSearchParams params) { - return executeCommand(commandObjects.ftSearch(indexName, query, params)); + return executeKeylessCommand(commandObjects.ftSearch(indexName, query, params)); } /** @@ -4006,13 +3992,14 @@ public SearchResult ftSearch(String indexName, String query, FTSearchParams para * @param params limit will be ignored * @return search iteration */ + @Deprecated public FtSearchIteration ftSearchIteration(int batchSize, String indexName, String query, FTSearchParams params) { return new FtSearchIteration(provider, commandObjects.getProtocol(), batchSize, indexName, query, params); } @Override public SearchResult ftSearch(String indexName, Query query) { - return executeCommand(commandObjects.ftSearch(indexName, query)); + return executeKeylessCommand(commandObjects.ftSearch(indexName, query)); } /** @@ -4022,6 +4009,7 @@ public SearchResult ftSearch(String indexName, Query query) { * @param query limit will be ignored * @return search iteration */ + @Deprecated public FtSearchIteration ftSearchIteration(int batchSize, String indexName, Query query) { return new FtSearchIteration(provider, commandObjects.getProtocol(), batchSize, indexName, query); } @@ -4029,17 +4017,17 @@ public FtSearchIteration ftSearchIteration(int batchSize, String indexName, Quer @Override @Deprecated public SearchResult ftSearch(byte[] indexName, Query query) { - return executeCommand(commandObjects.ftSearch(indexName, query)); + return executeKeylessCommand(commandObjects.ftSearch(indexName, query)); } @Override public String ftExplain(String indexName, Query query) { - return executeCommand(commandObjects.ftExplain(indexName, query)); + return executeKeylessCommand(commandObjects.ftExplain(indexName, query)); } @Override public List ftExplainCLI(String indexName, Query query) { - return executeCommand(commandObjects.ftExplainCLI(indexName, query)); + return executeKeylessCommand(commandObjects.ftExplainCLI(indexName, query)); } @Override @@ -4063,87 +4051,117 @@ public String ftCursorDel(String indexName, long cursorId) { * @param aggr cursor must be set * @return aggregate iteration */ + @Deprecated public FtAggregateIteration ftAggregateIteration(String indexName, AggregationBuilder aggr) { return new FtAggregateIteration(provider, indexName, aggr); } + /** + * Creates an iterator for aggregation results with cursor support. + * This method provides a clean, connection-aware iterator that ensures cursor operations + * are executed on the same Redis node. + * + *

Usage example: + *

{@code
+   * AggregationBuilder aggr = new AggregationBuilder()
+   *     .groupBy("@category", Reducers.sum("@price").as("total"))
+   *     .cursor(50, 30000);
+   *
+   * try (AggregateIterator iterator = jedis.ftAggregateIterator("products", aggr)) {
+   *     while (iterator.hasNext()) {
+   *         AggregationResult batch = iterator.next();
+   *         // Process batch - access rows via batch.getRows()
+   *     }
+   * }
+   * }
+ * + * @param indexName the search index name + * @param aggr aggregation builder with cursor configuration + * @return aggregate iterator for cursor-based pagination + * @throws IllegalArgumentException if aggregation doesn't have cursor configured + * @since 6.1.0 + */ + public AggregateIterator ftAggregateIterator(String indexName, AggregationBuilder aggr) { + return new AggregateIterator(provider, indexName, aggr); + } + @Override public Map.Entry ftProfileAggregate(String indexName, FTProfileParams profileParams, AggregationBuilder aggr) { - return executeCommand(commandObjects.ftProfileAggregate(indexName, profileParams, aggr)); + return executeKeylessCommand(commandObjects.ftProfileAggregate(indexName, profileParams, aggr)); } @Override public Map.Entry ftProfileSearch(String indexName, FTProfileParams profileParams, Query query) { - return executeCommand(commandObjects.ftProfileSearch(indexName, profileParams, query)); + return executeKeylessCommand(commandObjects.ftProfileSearch(indexName, profileParams, query)); } @Override public Map.Entry ftProfileSearch(String indexName, FTProfileParams profileParams, String query, FTSearchParams searchParams) { - return executeCommand(commandObjects.ftProfileSearch(indexName, profileParams, query, searchParams)); + return executeKeylessCommand(commandObjects.ftProfileSearch(indexName, profileParams, query, searchParams)); } @Override public String ftSynUpdate(String indexName, String synonymGroupId, String... terms) { - return executeCommand(commandObjects.ftSynUpdate(indexName, synonymGroupId, terms)); + return executeKeylessCommand(commandObjects.ftSynUpdate(indexName, synonymGroupId, terms)); } @Override public Map> ftSynDump(String indexName) { - return executeCommand(commandObjects.ftSynDump(indexName)); + return executeKeylessCommand(commandObjects.ftSynDump(indexName)); } @Override public long ftDictAdd(String dictionary, String... terms) { - return executeCommand(commandObjects.ftDictAdd(dictionary, terms)); + return executeKeylessCommand(commandObjects.ftDictAdd(dictionary, terms)); } @Override public long ftDictDel(String dictionary, String... terms) { - return executeCommand(commandObjects.ftDictDel(dictionary, terms)); + return executeKeylessCommand(commandObjects.ftDictDel(dictionary, terms)); } @Override public Set ftDictDump(String dictionary) { - return executeCommand(commandObjects.ftDictDump(dictionary)); + return executeKeylessCommand(commandObjects.ftDictDump(dictionary)); } @Override public long ftDictAddBySampleKey(String indexName, String dictionary, String... terms) { - return executeCommand(commandObjects.ftDictAddBySampleKey(indexName, dictionary, terms)); + return executeKeylessCommand(commandObjects.ftDictAddBySampleKey(indexName, dictionary, terms)); } @Override public long ftDictDelBySampleKey(String indexName, String dictionary, String... terms) { - return executeCommand(commandObjects.ftDictDelBySampleKey(indexName, dictionary, terms)); + return executeKeylessCommand(commandObjects.ftDictDelBySampleKey(indexName, dictionary, terms)); } @Override public Set ftDictDumpBySampleKey(String indexName, String dictionary) { - return executeCommand(commandObjects.ftDictDumpBySampleKey(indexName, dictionary)); + return executeKeylessCommand(commandObjects.ftDictDumpBySampleKey(indexName, dictionary)); } @Override public Map> ftSpellCheck(String index, String query) { - return executeCommand(commandObjects.ftSpellCheck(index, query)); + return executeKeylessCommand(commandObjects.ftSpellCheck(index, query)); } @Override public Map> ftSpellCheck(String index, String query, FTSpellCheckParams spellCheckParams) { - return executeCommand(commandObjects.ftSpellCheck(index, query, spellCheckParams)); + return executeKeylessCommand(commandObjects.ftSpellCheck(index, query, spellCheckParams)); } @Override public Map ftInfo(String indexName) { - return executeCommand(commandObjects.ftInfo(indexName)); + return executeKeylessCommand(commandObjects.ftInfo(indexName)); } @Override public Set ftTagVals(String indexName, String fieldName) { - return executeCommand(commandObjects.ftTagVals(indexName, fieldName)); + return executeKeylessCommand(commandObjects.ftTagVals(indexName, fieldName)); } @Override diff --git a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java index 3dbcac0a8c..5a49de5087 100644 --- a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java @@ -2,9 +2,12 @@ import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +29,9 @@ public class ClusterCommandExecutor implements CommandExecutor { protected final int maxAttempts; protected final Duration maxTotalRetriesDuration; + // Round-robin counter for keyless command distribution + private final AtomicInteger roundRobinCounter = new AtomicInteger(0); + public ClusterCommandExecutor(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) { this.provider = provider; @@ -76,6 +82,52 @@ public final T executeCommand(CommandObject commandObject) { return doExecuteCommand(commandObject, false); } + @Override + public final T executeKeylessCommand(CommandObject commandObject) { + Instant deadline = Instant.now().plus(maxTotalRetriesDuration); + int consecutiveConnectionFailures = 0; + Exception lastException = null; + + for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) { + Connection connection = null; + try { + // Use round-robin distribution for keyless commands + connection = getNextConnection(); + return execute(connection, commandObject); + + } catch (JedisConnectionException jce) { + lastException = jce; + ++consecutiveConnectionFailures; + log.debug("Failed connecting to Redis: {}", connection, jce); + + if (consecutiveConnectionFailures < 2) { + continue; + } + + boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline); + if (reset) { + consecutiveConnectionFailures = 0; + } + } catch (JedisRedirectionException jre) { + // For keyless commands, we don't follow redirections since we're not targeting a specific slot + // Just retry with a different random node + lastException = jre; + log.debug("Received redirection for keyless command, retrying with different node: {}", jre.getMessage()); + consecutiveConnectionFailures = 0; + } finally { + IOUtils.closeQuietly(connection); + } + if (Instant.now().isAfter(deadline)) { + throw new JedisClusterOperationException("Cluster retry deadline exceeded.", lastException); + } + } + + JedisClusterOperationException maxAttemptsException + = new JedisClusterOperationException("No more cluster attempts left."); + maxAttemptsException.addSuppressed(lastException); + throw maxAttemptsException; + } + public final T executeCommandToReplica(CommandObject commandObject) { return doExecuteCommand(commandObject, true); } @@ -141,6 +193,32 @@ private T doExecuteCommand(CommandObject commandObject, boolean toReplica throw maxAttemptsException; } + /** + * Gets a connection using round-robin distribution across all cluster nodes. + * This ensures even distribution of keyless commands across the cluster. + * + * @return Connection from the next node in round-robin sequence + * @throws JedisClusterOperationException if no cluster nodes are available + */ + private Connection getNextConnection() { + Map connectionMap = provider.getConnectionMap(); + + if (connectionMap.isEmpty()) { + throw new JedisClusterOperationException("No cluster nodes available."); + } + + // Convert connection map to list for round-robin access + List> nodeList = new ArrayList<>(connectionMap.entrySet()); + + // Select node using round-robin distribution for true unified distribution + // Use modulo directly on the node list size to create a circular counter + int roundRobinIndex = roundRobinCounter.getAndUpdate(current -> (current + 1) % nodeList.size()); + Map.Entry selectedEntry = nodeList.get(roundRobinIndex); + ConnectionPool pool = selectedEntry.getValue(); + + return pool.getResource(); + } + /** * WARNING: This method is accessible for the purpose of testing. * This should not be used or overriden. diff --git a/src/main/java/redis/clients/jedis/executors/CommandExecutor.java b/src/main/java/redis/clients/jedis/executors/CommandExecutor.java index 85ec034b02..a0a25bf975 100644 --- a/src/main/java/redis/clients/jedis/executors/CommandExecutor.java +++ b/src/main/java/redis/clients/jedis/executors/CommandExecutor.java @@ -9,4 +9,8 @@ public interface CommandExecutor extends AutoCloseable { default T broadcastCommand(CommandObject commandObject) { return executeCommand(commandObject); } + + default T executeKeylessCommand(CommandObject commandObject) { + return executeCommand(commandObject); + } } diff --git a/src/main/java/redis/clients/jedis/search/FtSearchIteration.java b/src/main/java/redis/clients/jedis/search/FtSearchIteration.java index c856e5d780..aa5993fd75 100644 --- a/src/main/java/redis/clients/jedis/search/FtSearchIteration.java +++ b/src/main/java/redis/clients/jedis/search/FtSearchIteration.java @@ -9,6 +9,14 @@ import redis.clients.jedis.search.SearchResult.SearchResultBuilder; import redis.clients.jedis.util.JedisCommandIterationBase; +/** + * Iterator for FT.SEARCH results across cluster nodes. + * + * @deprecated Since Redis 8.0, FT.SEARCH automatically retrieves results from all cluster nodes, + * eliminating the need for manual iteration across nodes. Use the standard + * {@code ftSearch} methods directly instead. + */ +@Deprecated public class FtSearchIteration extends JedisCommandIterationBase { private int batchStart; diff --git a/src/main/java/redis/clients/jedis/search/RediSearchCommands.java b/src/main/java/redis/clients/jedis/search/RediSearchCommands.java index 451f2e58bb..5e9c6615d7 100644 --- a/src/main/java/redis/clients/jedis/search/RediSearchCommands.java +++ b/src/main/java/redis/clients/jedis/search/RediSearchCommands.java @@ -7,6 +7,7 @@ import redis.clients.jedis.commands.ConfigCommands; import redis.clients.jedis.resps.Tuple; +import redis.clients.jedis.search.aggr.AggregateIterator; import redis.clients.jedis.search.aggr.AggregationBuilder; import redis.clients.jedis.search.aggr.AggregationResult; import redis.clients.jedis.search.schemafields.SchemaField; @@ -68,10 +69,47 @@ default SearchResult ftSearch(String indexName) { List ftExplainCLI(String indexName, Query query); + /** + * Execute an aggregation query and return all results at once. + * Use this method when you don't need cursor-based pagination and want to retrieve + * all results in a single operation. + * @param indexName the index name + * @param aggr the aggregation builder containing the query + * @return the complete aggregation result + */ AggregationResult ftAggregate(String indexName, AggregationBuilder aggr); + /** + * Execute an aggregation query with cursor-based iteration support. + * Use this method when you need to paginate through large result sets or want + * to process results incrementally using cursor-based iteration. + * @param indexName the index name + * @param aggr the aggregation builder containing the query (should include cursor configuration) + * @return an iterator for cursor-based result pagination + */ + AggregateIterator ftAggregateIterator(String indexName, AggregationBuilder aggr); + + /** + * Read from an aggregation cursor. + * @param indexName the index name + * @param cursorId the cursor ID + * @param count the number of results to read + * @return the aggregation result + * @deprecated Use {@link #ftAggregate(String, AggregationBuilder)} for operations without cursors, + * or {@link #ftAggregateIterator(String, AggregationBuilder)} for cursor-based iteration + */ + @Deprecated AggregationResult ftCursorRead(String indexName, long cursorId, int count); + /** + * Delete an aggregation cursor. + * @param indexName the index name + * @param cursorId the cursor ID + * @return the result of the cursor deletion + * @deprecated Use {@link #ftAggregate(String, AggregationBuilder)} for operations without cursors, + * or {@link #ftAggregateIterator(String, AggregationBuilder)} for cursor-based iteration + */ + @Deprecated String ftCursorDel(String indexName, long cursorId); Map.Entry ftProfileAggregate(String indexName, diff --git a/src/main/java/redis/clients/jedis/search/aggr/AggregateIterator.java b/src/main/java/redis/clients/jedis/search/aggr/AggregateIterator.java new file mode 100644 index 0000000000..3ed7db6263 --- /dev/null +++ b/src/main/java/redis/clients/jedis/search/aggr/AggregateIterator.java @@ -0,0 +1,203 @@ +package redis.clients.jedis.search.aggr; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import redis.clients.jedis.Connection; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.providers.ConnectionProvider; +import redis.clients.jedis.search.SearchProtocol; +import redis.clients.jedis.util.IOUtils; + +/** + * Iterator for Redis search aggregation results with cursor support. + * This class manages the connection to a specific Redis node and handles + * cursor-based pagination for large aggregation results. + * + *

The iterator supports the {@link #remove()} method which deletes the cursor + * on the server and terminates the iteration, freeing server resources immediately. + * + *

Usage example: + *

{@code
+ * AggregationBuilder aggr = new AggregationBuilder()
+ *     .groupBy("@field")
+ *     .cursor(100, 60000); // 100 results per batch, 60 second TTL for the cursor
+ *
+ * try (AggregateIterator iterator = new AggregateIterator(provider, "myindex", aggr)) {
+ *     while (iterator.hasNext()) {
+ *         AggregationResult batch = iterator.next();
+ *
+ *         if (batch.isEmpty()) {
+ *             break; // FT.AGGREGATE returned empty result set
+ *         }
+ *
+ *         // Process batch - access rows via batch.getRows()
+ *
+ *         // Optionally terminate early and free server resources
+ *         if (someCondition) {
+ *             iterator.remove(); // Deletes cursor and stops iteration
+ *             break;
+ *         }
+ *     }
+ * }
+ * }
+ */ +public class AggregateIterator implements Iterator, Closeable { + + private final ConnectionProvider connectionProvider; + private final String indexName; + private final Integer batchSize; + + private Connection connection; + private Long cursorId = -1L; + private AggregationResult aggrCommandResult; + + /** + * Creates a new AggregateIterator. + * + * @param connectionProvider the connection provider for cluster/standalone Redis + * @param indexName the search index name + * @param aggregationBuilder the aggregation query with cursor configuration + * @throws IllegalArgumentException if aggregation doesn't have cursor configured + */ + public AggregateIterator(ConnectionProvider connectionProvider, String indexName, + AggregationBuilder aggregationBuilder) { + if (!aggregationBuilder.isWithCursor()) { + throw new IllegalArgumentException("AggregationBuilder must have cursor configured"); + } + + this.connectionProvider = connectionProvider; + this.indexName = indexName; + this.batchSize = aggregationBuilder.getCursorCount(); + + // Get a dedicated connection for this cursor session + this.connection = acquireConnection(aggregationBuilder); + } + + @Override + public boolean hasNext() { + return aggrCommandResult != null || cursorId > 0; + } + + @Override + public AggregationResult next() { + if (!hasNext()) { + throw new NoSuchElementException("No more aggregation results available"); + } + + try { + if (aggrCommandResult != null) { + try { + return aggrCommandResult; + } finally { + aggrCommandResult = null; + } + } else { + return doFetch(); + } + + } catch (Exception e) { + throw new JedisException("Failed to fetch next aggregation batch", e); + } + } + + /** + * Returns the current cursor ID. + * + * @return cursor ID, or null if not initialized + */ + public Long getCursorId() { + return cursorId; + } + + @Override + public void remove() { + aggrCommandResult = null; + + if (cursorId == null || cursorId <= 0) { + // Cursor is already closed or not initialized, nothing to do + return; + } + + deleteCursor(); + // Mark cursor as deleted to prevent further operations + cursorId = -1L; + } + + @Override + public void close() { + deleteCursor(); + // Mark cursor as closed to prevent further operations + cursorId = -1L; + IOUtils.closeQuietly(connection); + } + + /** + * Deletes the cursor on the server to free resources. + * This method is idempotent and safe to call multiple times. + */ + private void deleteCursor() { + if (cursorId != null && cursorId > 0) { + try { + // Delete the cursor to free server resources + connection.executeCommand( + new redis.clients.jedis.CommandArguments(SearchProtocol.SearchCommand.CURSOR) + .add(SearchProtocol.SearchKeyword.DEL) + .add(indexName) + .add(cursorId) + ); + } catch (Exception e) { + // Log but don't throw - cursor will expire naturally + System.err.println("Warning: Failed to delete cursor " + cursorId + ": " + e.getMessage()); + } + } + } + + private AggregationResult doFetch() { + if (cursorId == null || cursorId <= 0) { + return null; + } + + redis.clients.jedis.CommandArguments args = new redis.clients.jedis.CommandArguments(SearchProtocol.SearchCommand.CURSOR) + .add(SearchProtocol.SearchKeyword.READ) + .add(indexName) + .add(cursorId); + + // Only add COUNT argument if a batch size was explicitly specified + if (batchSize != null) { + args.add(SearchProtocol.SearchKeyword.COUNT).add(batchSize); + } + + Object rawReply = connection.executeCommand(args); + AggregationResult result = AggregationResult.SEARCH_AGGREGATION_RESULT_WITH_CURSOR.build(rawReply); + + cursorId = result.getCursorId(); + return result; + } + + private Connection acquireConnection(AggregationBuilder aggregationBuilder) { + // Create the initial FT.AGGREGATE command + redis.clients.jedis.CommandArguments args = new redis.clients.jedis.CommandArguments(SearchProtocol.SearchCommand.AGGREGATE) + .add(indexName) + .addParams(aggregationBuilder); + + Connection conn = null; + try { + // Get connection and execute initial command + conn = connectionProvider.getConnection(args); + Object rawReply = conn.executeCommand(args); + aggrCommandResult = AggregationResult.SEARCH_AGGREGATION_RESULT_WITH_CURSOR.build(rawReply); + + cursorId = aggrCommandResult.getCursorId(); + + return conn; + + } catch (Exception e) { + IOUtils.closeQuietly(conn); + throw new JedisException("Failed to initialize aggregation cursor", e); + } + } + + +} diff --git a/src/main/java/redis/clients/jedis/search/aggr/AggregationBuilder.java b/src/main/java/redis/clients/jedis/search/aggr/AggregationBuilder.java index ec478b3367..af8cb33d28 100644 --- a/src/main/java/redis/clients/jedis/search/aggr/AggregationBuilder.java +++ b/src/main/java/redis/clients/jedis/search/aggr/AggregationBuilder.java @@ -22,6 +22,7 @@ public class AggregationBuilder implements IParams { private final List aggrArgs = new ArrayList<>(); private Integer dialect; private boolean isWithCursor = false; + private Integer cursorCount; public AggregationBuilder(String query) { aggrArgs.add(query); @@ -143,6 +144,7 @@ public AggregationBuilder filter(String expression) { public AggregationBuilder cursor(int count) { isWithCursor = true; + this.cursorCount = count; aggrArgs.add(SearchKeyword.WITHCURSOR); aggrArgs.add(SearchKeyword.COUNT); aggrArgs.add(count); @@ -151,6 +153,7 @@ public AggregationBuilder cursor(int count) { public AggregationBuilder cursor(int count, long maxIdle) { isWithCursor = true; + this.cursorCount = count; aggrArgs.add(SearchKeyword.WITHCURSOR); aggrArgs.add(SearchKeyword.COUNT); aggrArgs.add(count); @@ -206,6 +209,15 @@ public boolean isWithCursor() { return isWithCursor; } + /** + * Returns the cursor count (batch size) if cursor is configured. + * + * @return cursor count, or null if cursor is not configured + */ + public Integer getCursorCount() { + return cursorCount; + } + @Override public void addParams(CommandArguments commArgs) { commArgs.addObjects(aggrArgs); diff --git a/src/main/java/redis/clients/jedis/search/aggr/AggregationResult.java b/src/main/java/redis/clients/jedis/search/aggr/AggregationResult.java index 3eba4ac1d1..c43edf6501 100644 --- a/src/main/java/redis/clients/jedis/search/aggr/AggregationResult.java +++ b/src/main/java/redis/clients/jedis/search/aggr/AggregationResult.java @@ -65,6 +65,14 @@ public List getWarnings() { return warnings; } + /** + * Tests if this aggregation result is empty. + * @return true if there are no warnings and no rows, false otherwise + */ + public boolean isEmpty() { + return (warnings == null || warnings.isEmpty()) && (results == null || results.isEmpty()); + } + public static final Builder SEARCH_AGGREGATION_RESULT = new Builder() { private static final String TOTAL_RESULTS_STR = "total_results"; diff --git a/src/main/java/redis/clients/jedis/search/aggr/FtAggregateIteration.java b/src/main/java/redis/clients/jedis/search/aggr/FtAggregateIteration.java index 931834ed49..9c1e209ed3 100644 --- a/src/main/java/redis/clients/jedis/search/aggr/FtAggregateIteration.java +++ b/src/main/java/redis/clients/jedis/search/aggr/FtAggregateIteration.java @@ -7,6 +7,12 @@ import redis.clients.jedis.search.SearchProtocol; import redis.clients.jedis.util.JedisCommandIterationBase; +/** + * @deprecated Since Redis 8.0, FT.AGGREGATE automatically retrieves results from all cluster nodes, + * eliminating the need for manual iteration across nodes. Use {@link AggregateIterator} + * instead, which provides better cursor management and connection handling. + */ +@Deprecated public class FtAggregateIteration extends JedisCommandIterationBase { private final String indexName; diff --git a/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java b/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java index d6437e84ea..3585d003ec 100644 --- a/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java +++ b/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java @@ -7,9 +7,16 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongConsumer; import org.hamcrest.MatcherAssert; @@ -19,7 +26,6 @@ import org.mockito.InOrder; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import redis.clients.jedis.exceptions.JedisAskDataException; import redis.clients.jedis.exceptions.JedisClusterOperationException; @@ -357,4 +363,409 @@ protected void sleep(long sleepMillis) { inOrder.verifyNoMoreInteractions(); assertEquals(0L, totalSleepMs.get()); } + + @Test + public void runSuccessfulExecuteKeylessCommand() { + ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); + Map connectionMap = new HashMap<>(); + ConnectionPool pool = mock(ConnectionPool.class); + Connection connection = mock(Connection.class); + + connectionMap.put("localhost:6379", pool); + when(connectionHandler.getConnectionMap()).thenReturn(connectionMap); + when(pool.getResource()).thenReturn(connection); + + ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO) { + @Override + public T execute(Connection connection, CommandObject commandObject) { + return (T) "keyless_result"; + } + @Override + protected void sleep(long ignored) { + throw new RuntimeException("This test should never sleep"); + } + }; + assertEquals("keyless_result", testMe.executeKeylessCommand(STR_COM_OBJECT)); + } + + @Test + public void runKeylessCommandUsesConnectionMapRoundRobin() { + ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); + Map connectionMap = new HashMap<>(); + ConnectionPool pool = mock(ConnectionPool.class); + Connection connection = mock(Connection.class); + + connectionMap.put("localhost:6379", pool); + when(connectionHandler.getConnectionMap()).thenReturn(connectionMap); + when(pool.getResource()).thenReturn(connection); + + ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO) { + @Override + public T execute(Connection connection, CommandObject commandObject) { + return (T) "keyless_result"; + } + @Override + protected void sleep(long ignored) { + throw new RuntimeException("This test should never sleep"); + } + }; + + testMe.executeKeylessCommand(STR_COM_OBJECT); + + // Verify that getConnectionMap() was called for round-robin distribution + InOrder inOrder = inOrder(connectionHandler, pool, connection); + inOrder.verify(connectionHandler).getConnectionMap(); + inOrder.verify(pool).getResource(); + inOrder.verify(connection).close(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void runKeylessCommandIgnoresRedirections() { + ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); + Map connectionMap = new HashMap<>(); + ConnectionPool pool = mock(ConnectionPool.class); + Connection connection1 = mock(Connection.class); + Connection connection2 = mock(Connection.class); + final HostAndPort movedTarget = new HostAndPort(null, 0); + + connectionMap.put("localhost:6379", pool); + when(connectionHandler.getConnectionMap()).thenReturn(connectionMap); + when(pool.getResource()).thenReturn(connection1, connection2); + + ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND) { + boolean isFirstCall = true; + + @Override + public T execute(Connection connection, CommandObject commandObject) { + if (isFirstCall) { + isFirstCall = false; + // Keyless commands should ignore redirections and retry with different random node + throw new JedisMovedDataException("", movedTarget, 0); + } + return (T) "keyless_result"; + } + + @Override + protected void sleep(long ignored) { + throw new RuntimeException("This test should never sleep"); + } + }; + + assertEquals("keyless_result", testMe.executeKeylessCommand(STR_COM_OBJECT)); + + // Verify that we called getConnectionMap() twice (first failed with redirection, second succeeded) + // and that we didn't follow the redirection to a specific node + verify(connectionHandler, times(2)).getConnectionMap(); + verify(pool, times(2)).getResource(); + verify(connection1).close(); + verify(connection2).close(); + } + + @Test + public void runKeylessCommandFailsAfterMaxAttempts() { + ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); + Map connectionMap = new HashMap<>(); + ConnectionPool pool = mock(ConnectionPool.class); + Connection connection1 = mock(Connection.class); + Connection connection2 = mock(Connection.class); + Connection connection3 = mock(Connection.class); + final LongConsumer sleep = mock(LongConsumer.class); + + connectionMap.put("localhost:6379", pool); + when(connectionHandler.getConnectionMap()).thenReturn(connectionMap); + when(pool.getResource()).thenReturn(connection1, connection2, connection3); + + ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 3, ONE_SECOND) { + @Override + public T execute(Connection connection, CommandObject commandObject) { + throw new JedisConnectionException("Connection failed"); + } + + @Override + protected void sleep(long sleepMillis) { + sleep.accept(sleepMillis); + } + }; + + try { + testMe.executeKeylessCommand(STR_COM_OBJECT); + fail("keyless command did not fail"); + } catch (JedisClusterOperationException e) { + // expected + } + + // Verify that we tried connection map access and performed slot cache renewal + // getConnectionMap() called 3 times (once for each connection attempt) + // getResource() called 3 times, sleep called once, renewSlotCache called once + verify(connectionHandler, times(3)).getConnectionMap(); + verify(pool, times(3)).getResource(); + verify(connection1).close(); + verify(connection2).close(); + verify(connection3).close(); + verify(sleep).accept(ArgumentMatchers.anyLong()); + verify(connectionHandler).renewSlotCache(); + } + + @Test + public void runKeylessCommandFailsWithEmptyConnectionMap() { + ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); + Map emptyConnectionMap = new HashMap<>(); + + when(connectionHandler.getConnectionMap()).thenReturn(emptyConnectionMap); + + ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 3, ONE_SECOND) { + @Override + public T execute(Connection connection, CommandObject commandObject) { + return (T) "should_not_reach_here"; + } + @Override + protected void sleep(long ignored) { + throw new RuntimeException("This test should never sleep"); + } + }; + + try { + testMe.executeKeylessCommand(STR_COM_OBJECT); + fail("keyless command should fail with empty connection map"); + } catch (JedisClusterOperationException e) { + assertEquals("No cluster nodes available.", e.getMessage()); + } + + // Verify that getConnectionMap() was called + verify(connectionHandler).getConnectionMap(); + } + + @Test + public void runKeylessCommandRoundRobinDistribution() { + ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); + Map connectionMap = new HashMap<>(); + + // Create multiple pools to test round-robin + ConnectionPool pool1 = mock(ConnectionPool.class); + ConnectionPool pool2 = mock(ConnectionPool.class); + ConnectionPool pool3 = mock(ConnectionPool.class); + + Connection connection1 = mock(Connection.class); + Connection connection2 = mock(Connection.class); + Connection connection3 = mock(Connection.class); + + connectionMap.put("localhost:6379", pool1); + connectionMap.put("localhost:6380", pool2); + connectionMap.put("localhost:6381", pool3); + + when(connectionHandler.getConnectionMap()).thenReturn(connectionMap); + when(pool1.getResource()).thenReturn(connection1); + when(pool2.getResource()).thenReturn(connection2); + when(pool3.getResource()).thenReturn(connection3); + + // Track which connections are used + List usedConnections = new ArrayList<>(); + + ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO) { + @Override + public T execute(Connection connection, CommandObject commandObject) { + usedConnections.add(connection); + return (T) "keyless_result"; + } + @Override + protected void sleep(long ignored) { + throw new RuntimeException("This test should never sleep"); + } + }; + + // Execute multiple keyless commands to verify round-robin + testMe.executeKeylessCommand(STR_COM_OBJECT); + testMe.executeKeylessCommand(STR_COM_OBJECT); + testMe.executeKeylessCommand(STR_COM_OBJECT); + testMe.executeKeylessCommand(STR_COM_OBJECT); // Should cycle back to first + + // Verify round-robin behavior - should cycle through all connections + assertEquals(4, usedConnections.size()); + Set uniqueConnections = new HashSet<>(usedConnections); + assertEquals(3, uniqueConnections.size(), + "Round-robin should distribute across multiple nodes"); + } + + @Test + public void runKeylessCommandCircularCounterNeverOverflows() { + ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); + Map connectionMap = new HashMap<>(); + + // Create 3 pools to test circular behavior + ConnectionPool pool1 = mock(ConnectionPool.class); + ConnectionPool pool2 = mock(ConnectionPool.class); + ConnectionPool pool3 = mock(ConnectionPool.class); + + Connection connection1 = mock(Connection.class); + Connection connection2 = mock(Connection.class); + Connection connection3 = mock(Connection.class); + + connectionMap.put("node1:6379", pool1); + connectionMap.put("node2:6379", pool2); + connectionMap.put("node3:6379", pool3); + + when(connectionHandler.getConnectionMap()).thenReturn(connectionMap); + when(pool1.getResource()).thenReturn(connection1); + when(pool2.getResource()).thenReturn(connection2); + when(pool3.getResource()).thenReturn(connection3); + + ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO) { + @Override + public T execute(Connection connection, CommandObject commandObject) { + return (T) "keyless_result"; + } + @Override + protected void sleep(long ignored) { + throw new RuntimeException("This test should never sleep"); + } + }; + + // Execute many commands to test circular behavior + // With our implementation using getAndUpdate(current -> (current + 1) % nodeCount), + // the counter never exceeds nodeCount-1, so overflow is impossible + for (int i = 0; i < 100; i++) { + String result = testMe.executeKeylessCommand(STR_COM_OBJECT); + assertEquals("keyless_result", result); + } + + // Verify that getConnectionMap() was called for each execution + verify(connectionHandler, times(100)).getConnectionMap(); + + // The circular counter implementation ensures no overflow can occur + // because the counter value is always between 0 and (nodeCount-1) + } + + @Test + public void runKeylessCommandEvenDistributionRoundRobin() { + ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); + Map connectionMap = new HashMap<>(); + + // Create 4 pools to test even distribution + ConnectionPool pool1 = mock(ConnectionPool.class); + ConnectionPool pool2 = mock(ConnectionPool.class); + ConnectionPool pool3 = mock(ConnectionPool.class); + ConnectionPool pool4 = mock(ConnectionPool.class); + + Connection connection1 = mock(Connection.class); + Connection connection2 = mock(Connection.class); + Connection connection3 = mock(Connection.class); + Connection connection4 = mock(Connection.class); + + // Use ordered map to ensure consistent iteration order for testing + connectionMap.put("node1:6379", pool1); + connectionMap.put("node2:6379", pool2); + connectionMap.put("node3:6379", pool3); + connectionMap.put("node4:6379", pool4); + + when(connectionHandler.getConnectionMap()).thenReturn(connectionMap); + when(pool1.getResource()).thenReturn(connection1); + when(pool2.getResource()).thenReturn(connection2); + when(pool3.getResource()).thenReturn(connection3); + when(pool4.getResource()).thenReturn(connection4); + + // Track connection usage count + Map connectionUsage = new HashMap<>(); + connectionUsage.put(connection1, 0); + connectionUsage.put(connection2, 0); + connectionUsage.put(connection3, 0); + connectionUsage.put(connection4, 0); + + ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO) { + @Override + public T execute(Connection connection, CommandObject commandObject) { + connectionUsage.put(connection, connectionUsage.get(connection) + 1); + return (T) "keyless_result"; + } + @Override + protected void sleep(long ignored) { + throw new RuntimeException("This test should never sleep"); + } + }; + + // Execute commands - should be evenly distributed + int totalCommands = 40; // Multiple of 4 for perfect distribution + for (int i = 0; i < totalCommands; i++) { + testMe.executeKeylessCommand(STR_COM_OBJECT); + } + + // Verify even distribution - each node should get exactly 10 commands + int expectedPerNode = totalCommands / 4; + assertEquals(expectedPerNode, connectionUsage.get(connection1).intValue(), + "Node 1 should receive exactly " + expectedPerNode + " commands"); + assertEquals(expectedPerNode, connectionUsage.get(connection2).intValue(), + "Node 2 should receive exactly " + expectedPerNode + " commands"); + assertEquals(expectedPerNode, connectionUsage.get(connection3).intValue(), + "Node 3 should receive exactly " + expectedPerNode + " commands"); + assertEquals(expectedPerNode, connectionUsage.get(connection4).intValue(), + "Node 4 should receive exactly " + expectedPerNode + " commands"); + + // Verify total commands executed + int totalExecuted = connectionUsage.values().stream().mapToInt(Integer::intValue).sum(); + assertEquals(totalCommands, totalExecuted, "Total commands executed should match"); + + // Verify that getConnectionMap() was called for each execution + verify(connectionHandler, times(totalCommands)).getConnectionMap(); + } + + @Test + public void runKeylessCommandRoundRobinSequence() { + ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); + Map connectionMap = new HashMap<>(); + + // Create 3 pools for simpler sequence verification + ConnectionPool pool1 = mock(ConnectionPool.class); + ConnectionPool pool2 = mock(ConnectionPool.class); + ConnectionPool pool3 = mock(ConnectionPool.class); + + Connection connection1 = mock(Connection.class); + Connection connection2 = mock(Connection.class); + Connection connection3 = mock(Connection.class); + + // Use LinkedHashMap to ensure consistent iteration order + connectionMap = new java.util.LinkedHashMap<>(); + connectionMap.put("node1:6379", pool1); + connectionMap.put("node2:6379", pool2); + connectionMap.put("node3:6379", pool3); + + when(connectionHandler.getConnectionMap()).thenReturn(connectionMap); + when(pool1.getResource()).thenReturn(connection1); + when(pool2.getResource()).thenReturn(connection2); + when(pool3.getResource()).thenReturn(connection3); + + // Track the exact sequence of connections used + List connectionSequence = new ArrayList<>(); + + ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO) { + @Override + public T execute(Connection connection, CommandObject commandObject) { + if (connection == connection1) { + connectionSequence.add("node1"); + } else if (connection == connection2) { + connectionSequence.add("node2"); + } else if (connection == connection3) { + connectionSequence.add("node3"); + } + return (T) "keyless_result"; + } + @Override + protected void sleep(long ignored) { + throw new RuntimeException("This test should never sleep"); + } + }; + + // Execute 9 commands to see 3 complete cycles + for (int i = 0; i < 9; i++) { + testMe.executeKeylessCommand(STR_COM_OBJECT); + } + + // Verify the round-robin sequence + List expectedSequence = new ArrayList<>(); + expectedSequence.add("node1"); expectedSequence.add("node2"); expectedSequence.add("node3"); // First cycle + expectedSequence.add("node1"); expectedSequence.add("node2"); expectedSequence.add("node3"); // Second cycle + expectedSequence.add("node1"); expectedSequence.add("node2"); expectedSequence.add("node3"); // Third cycle + + assertEquals(expectedSequence, connectionSequence, + "Round-robin should follow exact sequence: node1 -> node2 -> node3 -> node1 -> ..."); + } } diff --git a/src/test/java/redis/clients/jedis/mocked/unified/UnifiedJedisSearchAndQueryCommandsTest.java b/src/test/java/redis/clients/jedis/mocked/unified/UnifiedJedisSearchAndQueryCommandsTest.java index 66a56001f2..7e486b71e9 100644 --- a/src/test/java/redis/clients/jedis/mocked/unified/UnifiedJedisSearchAndQueryCommandsTest.java +++ b/src/test/java/redis/clients/jedis/mocked/unified/UnifiedJedisSearchAndQueryCommandsTest.java @@ -49,13 +49,13 @@ public void testFtAliasAdd() { String expectedResponse = "OK"; when(commandObjects.ftAliasAdd(aliasName, indexName)).thenReturn(stringCommandObject); - when(commandExecutor.broadcastCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftAliasAdd(aliasName, indexName); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).broadcastCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftAliasAdd(aliasName, indexName); } @@ -65,13 +65,13 @@ public void testFtAliasDel() { String expectedResponse = "OK"; when(commandObjects.ftAliasDel(aliasName)).thenReturn(stringCommandObject); - when(commandExecutor.broadcastCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftAliasDel(aliasName); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).broadcastCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftAliasDel(aliasName); } @@ -82,13 +82,13 @@ public void testFtAliasUpdate() { String expectedResponse = "OK"; when(commandObjects.ftAliasUpdate(aliasName, indexName)).thenReturn(stringCommandObject); - when(commandExecutor.broadcastCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftAliasUpdate(aliasName, indexName); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).broadcastCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftAliasUpdate(aliasName, indexName); } @@ -99,13 +99,13 @@ public void testFtAlterWithSchema() { String expectedResponse = "OK"; when(commandObjects.ftAlter(indexName, schema)).thenReturn(stringCommandObject); - when(commandExecutor.broadcastCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftAlter(indexName, schema); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).broadcastCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftAlter(indexName, schema); } @@ -116,13 +116,13 @@ public void testFtAlterWithSchemaFields() { String expectedResponse = "OK"; when(commandObjects.ftAlter(indexName, schemaFields)).thenReturn(stringCommandObject); - when(commandExecutor.broadcastCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftAlter(indexName, schemaFields); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).broadcastCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftAlter(indexName, schemaFields); } @@ -202,13 +202,13 @@ public void testFtCreateWithOptionsAndSchema() { String expectedResponse = "OK"; when(commandObjects.ftCreate(indexName, indexOptions, schema)).thenReturn(stringCommandObject); - when(commandExecutor.broadcastCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftCreate(indexName, indexOptions, schema); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).broadcastCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftCreate(indexName, indexOptions, schema); } @@ -220,13 +220,13 @@ public void testFtCreateWithCreateParamsAndSchemaFields() { String expectedResponse = "OK"; when(commandObjects.ftCreate(indexName, createParams, schemaFields)).thenReturn(stringCommandObject); - when(commandExecutor.broadcastCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftCreate(indexName, createParams, schemaFields); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).broadcastCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftCreate(indexName, createParams, schemaFields); } @@ -272,13 +272,13 @@ public void testFtDictAdd() { long expectedResponse = 2L; when(commandObjects.ftDictAdd(dictionary, terms)).thenReturn(longCommandObject); - when(commandExecutor.executeCommand(longCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(longCommandObject)).thenReturn(expectedResponse); long result = jedis.ftDictAdd(dictionary, terms); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(longCommandObject); + verify(commandExecutor).executeKeylessCommand(longCommandObject); verify(commandObjects).ftDictAdd(dictionary, terms); } @@ -290,13 +290,13 @@ public void testFtDictAddBySampleKey() { long expectedResponse = 2L; when(commandObjects.ftDictAddBySampleKey(indexName, dictionary, terms)).thenReturn(longCommandObject); - when(commandExecutor.executeCommand(longCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(longCommandObject)).thenReturn(expectedResponse); long result = jedis.ftDictAddBySampleKey(indexName, dictionary, terms); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(longCommandObject); + verify(commandExecutor).executeKeylessCommand(longCommandObject); verify(commandObjects).ftDictAddBySampleKey(indexName, dictionary, terms); } @@ -307,13 +307,13 @@ public void testFtDictDel() { long expectedResponse = 1L; when(commandObjects.ftDictDel(dictionary, terms)).thenReturn(longCommandObject); - when(commandExecutor.executeCommand(longCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(longCommandObject)).thenReturn(expectedResponse); long result = jedis.ftDictDel(dictionary, terms); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(longCommandObject); + verify(commandExecutor).executeKeylessCommand(longCommandObject); verify(commandObjects).ftDictDel(dictionary, terms); } @@ -325,13 +325,13 @@ public void testFtDictDelBySampleKey() { long expectedResponse = 1L; when(commandObjects.ftDictDelBySampleKey(indexName, dictionary, terms)).thenReturn(longCommandObject); - when(commandExecutor.executeCommand(longCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(longCommandObject)).thenReturn(expectedResponse); long result = jedis.ftDictDelBySampleKey(indexName, dictionary, terms); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(longCommandObject); + verify(commandExecutor).executeKeylessCommand(longCommandObject); verify(commandObjects).ftDictDelBySampleKey(indexName, dictionary, terms); } @@ -341,13 +341,13 @@ public void testFtDictDump() { Set expectedResponse = new HashSet<>(Arrays.asList("term1", "term2")); when(commandObjects.ftDictDump(dictionary)).thenReturn(setStringCommandObject); - when(commandExecutor.executeCommand(setStringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(setStringCommandObject)).thenReturn(expectedResponse); Set result = jedis.ftDictDump(dictionary); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(setStringCommandObject); + verify(commandExecutor).executeKeylessCommand(setStringCommandObject); verify(commandObjects).ftDictDump(dictionary); } @@ -358,13 +358,13 @@ public void testFtDictDumpBySampleKey() { Set expectedResponse = new HashSet<>(Arrays.asList("term1", "term2")); when(commandObjects.ftDictDumpBySampleKey(indexName, dictionary)).thenReturn(setStringCommandObject); - when(commandExecutor.executeCommand(setStringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(setStringCommandObject)).thenReturn(expectedResponse); Set result = jedis.ftDictDumpBySampleKey(indexName, dictionary); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(setStringCommandObject); + verify(commandExecutor).executeKeylessCommand(setStringCommandObject); verify(commandObjects).ftDictDumpBySampleKey(indexName, dictionary); } @@ -374,13 +374,13 @@ public void testFtDropIndex() { String expectedResponse = "OK"; when(commandObjects.ftDropIndex(indexName)).thenReturn(stringCommandObject); - when(commandExecutor.broadcastCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftDropIndex(indexName); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).broadcastCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftDropIndex(indexName); } @@ -390,13 +390,13 @@ public void testFtDropIndexDD() { String expectedResponse = "OK"; when(commandObjects.ftDropIndexDD(indexName)).thenReturn(stringCommandObject); - when(commandExecutor.broadcastCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftDropIndexDD(indexName); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).broadcastCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftDropIndexDD(indexName); } @@ -407,13 +407,13 @@ public void testFtExplain() { String expectedResponse = "QUERY PLAN"; when(commandObjects.ftExplain(indexName, query)).thenReturn(stringCommandObject); - when(commandExecutor.executeCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftExplain(indexName, query); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftExplain(indexName, query); } @@ -424,13 +424,13 @@ public void testFtExplainCLI() { List expectedResponse = Arrays.asList("QUERY PLAN", "DETAILS"); when(commandObjects.ftExplainCLI(indexName, query)).thenReturn(listStringCommandObject); - when(commandExecutor.executeCommand(listStringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(listStringCommandObject)).thenReturn(expectedResponse); List result = jedis.ftExplainCLI(indexName, query); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(listStringCommandObject); + verify(commandExecutor).executeKeylessCommand(listStringCommandObject); verify(commandObjects).ftExplainCLI(indexName, query); } @@ -440,13 +440,13 @@ public void testFtInfo() { Map expectedResponse = Collections.singletonMap("index_definition", Collections.singletonMap("key_type", "HASH")); when(commandObjects.ftInfo(indexName)).thenReturn(mapStringObjectCommandObject); - when(commandExecutor.executeCommand(mapStringObjectCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(mapStringObjectCommandObject)).thenReturn(expectedResponse); Map result = jedis.ftInfo(indexName); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(mapStringObjectCommandObject); + verify(commandExecutor).executeKeylessCommand(mapStringObjectCommandObject); verify(commandObjects).ftInfo(indexName); } @@ -472,13 +472,13 @@ public void testFtSearch() { SearchResult expectedResponse = mock(SearchResult.class); when(commandObjects.ftSearch(indexName, query)).thenReturn(searchResultCommandObject); - when(commandExecutor.executeCommand(searchResultCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(searchResultCommandObject)).thenReturn(expectedResponse); SearchResult result = jedis.ftSearch(indexName, query); assertThat(result, sameInstance(expectedResponse)); - verify(commandExecutor).executeCommand(searchResultCommandObject); + verify(commandExecutor).executeKeylessCommand(searchResultCommandObject); verify(commandObjects).ftSearch(indexName, query); } @@ -490,13 +490,13 @@ public void testFtSearchWithParams() { SearchResult expectedResponse = mock(SearchResult.class); when(commandObjects.ftSearch(indexName, query, params)).thenReturn(searchResultCommandObject); - when(commandExecutor.executeCommand(searchResultCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(searchResultCommandObject)).thenReturn(expectedResponse); SearchResult result = jedis.ftSearch(indexName, query, params); assertThat(result, sameInstance(expectedResponse)); - verify(commandExecutor).executeCommand(searchResultCommandObject); + verify(commandExecutor).executeKeylessCommand(searchResultCommandObject); verify(commandObjects).ftSearch(indexName, query, params); } @@ -507,13 +507,13 @@ public void testFtSearchWithQueryObject() { SearchResult expectedResponse = mock(SearchResult.class); when(commandObjects.ftSearch(indexName, query)).thenReturn(searchResultCommandObject); - when(commandExecutor.executeCommand(searchResultCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(searchResultCommandObject)).thenReturn(expectedResponse); SearchResult result = jedis.ftSearch(indexName, query); assertThat(result, sameInstance(expectedResponse)); - verify(commandExecutor).executeCommand(searchResultCommandObject); + verify(commandExecutor).executeKeylessCommand(searchResultCommandObject); verify(commandObjects).ftSearch(indexName, query); } @@ -524,13 +524,13 @@ public void testFtSearchWithQueryObjectBinary() { SearchResult expectedResponse = mock(SearchResult.class); when(commandObjects.ftSearch(indexName, query)).thenReturn(searchResultCommandObject); - when(commandExecutor.executeCommand(searchResultCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(searchResultCommandObject)).thenReturn(expectedResponse); SearchResult result = jedis.ftSearch(indexName, query); assertThat(result, sameInstance(expectedResponse)); - verify(commandExecutor).executeCommand(searchResultCommandObject); + verify(commandExecutor).executeKeylessCommand(searchResultCommandObject); verify(commandObjects).ftSearch(indexName, query); } @@ -541,13 +541,13 @@ public void testFtSpellCheck() { Map> expectedResponse = Collections.singletonMap("term1", Collections.singletonMap("suggestion1", 1.0)); when(commandObjects.ftSpellCheck(index, query)).thenReturn(mapStringMapStringDoubleCommandObject); - when(commandExecutor.executeCommand(mapStringMapStringDoubleCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(mapStringMapStringDoubleCommandObject)).thenReturn(expectedResponse); Map> result = jedis.ftSpellCheck(index, query); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(mapStringMapStringDoubleCommandObject); + verify(commandExecutor).executeKeylessCommand(mapStringMapStringDoubleCommandObject); verify(commandObjects).ftSpellCheck(index, query); } @@ -559,13 +559,13 @@ public void testFtSpellCheckWithParams() { Map> expectedResponse = Collections.singletonMap("term1", Collections.singletonMap("suggestion1", 1.0)); when(commandObjects.ftSpellCheck(index, query, spellCheckParams)).thenReturn(mapStringMapStringDoubleCommandObject); - when(commandExecutor.executeCommand(mapStringMapStringDoubleCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(mapStringMapStringDoubleCommandObject)).thenReturn(expectedResponse); Map> result = jedis.ftSpellCheck(index, query, spellCheckParams); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(mapStringMapStringDoubleCommandObject); + verify(commandExecutor).executeKeylessCommand(mapStringMapStringDoubleCommandObject); verify(commandObjects).ftSpellCheck(index, query, spellCheckParams); } @@ -575,13 +575,13 @@ public void testFtSynDump() { Map> expectedResponse = Collections.singletonMap("group1", Arrays.asList("term1", "term2")); when(commandObjects.ftSynDump(indexName)).thenReturn(mapStringListStringCommandObject); - when(commandExecutor.executeCommand(mapStringListStringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(mapStringListStringCommandObject)).thenReturn(expectedResponse); Map> result = jedis.ftSynDump(indexName); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(mapStringListStringCommandObject); + verify(commandExecutor).executeKeylessCommand(mapStringListStringCommandObject); verify(commandObjects).ftSynDump(indexName); } @@ -593,13 +593,13 @@ public void testFtSynUpdate() { String expectedResponse = "OK"; when(commandObjects.ftSynUpdate(indexName, synonymGroupId, terms)).thenReturn(stringCommandObject); - when(commandExecutor.executeCommand(stringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(stringCommandObject)).thenReturn(expectedResponse); String result = jedis.ftSynUpdate(indexName, synonymGroupId, terms); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(stringCommandObject); + verify(commandExecutor).executeKeylessCommand(stringCommandObject); verify(commandObjects).ftSynUpdate(indexName, synonymGroupId, terms); } @@ -610,13 +610,13 @@ public void testFtTagVals() { Set expectedResponse = new HashSet<>(Arrays.asList("tag1", "tag2")); when(commandObjects.ftTagVals(indexName, fieldName)).thenReturn(setStringCommandObject); - when(commandExecutor.executeCommand(setStringCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(setStringCommandObject)).thenReturn(expectedResponse); Set result = jedis.ftTagVals(indexName, fieldName); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(setStringCommandObject); + verify(commandExecutor).executeKeylessCommand(setStringCommandObject); verify(commandObjects).ftTagVals(indexName, fieldName); } @@ -770,13 +770,13 @@ public void testFtProfileAggregate() { mock(AggregationResult.class), mock(ProfilingInfo.class)); when(commandObjects.ftProfileAggregate(indexName, profileParams, aggr)).thenReturn(entryAggregationResultMapStringObjectCommandObject); - when(commandExecutor.executeCommand(entryAggregationResultMapStringObjectCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(entryAggregationResultMapStringObjectCommandObject)).thenReturn(expectedResponse); Map.Entry result = jedis.ftProfileAggregate(indexName, profileParams, aggr); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(entryAggregationResultMapStringObjectCommandObject); + verify(commandExecutor).executeKeylessCommand(entryAggregationResultMapStringObjectCommandObject); verify(commandObjects).ftProfileAggregate(indexName, profileParams, aggr); } @@ -789,13 +789,13 @@ public void testFtProfileSearchWithQueryObject() { mock(SearchResult.class), mock(ProfilingInfo.class)); when(commandObjects.ftProfileSearch(indexName, profileParams, query)).thenReturn(entrySearchResultMapStringObjectCommandObject); - when(commandExecutor.executeCommand(entrySearchResultMapStringObjectCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(entrySearchResultMapStringObjectCommandObject)).thenReturn(expectedResponse); Map.Entry result = jedis.ftProfileSearch(indexName, profileParams, query); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(entrySearchResultMapStringObjectCommandObject); + verify(commandExecutor).executeKeylessCommand(entrySearchResultMapStringObjectCommandObject); verify(commandObjects).ftProfileSearch(indexName, profileParams, query); } @@ -809,13 +809,13 @@ public void testFtProfileSearchWithQueryAndSearchParams() { mock(SearchResult.class), mock(ProfilingInfo.class)); when(commandObjects.ftProfileSearch(indexName, profileParams, query, searchParams)).thenReturn(entrySearchResultMapStringObjectCommandObject); - when(commandExecutor.executeCommand(entrySearchResultMapStringObjectCommandObject)).thenReturn(expectedResponse); + when(commandExecutor.executeKeylessCommand(entrySearchResultMapStringObjectCommandObject)).thenReturn(expectedResponse); Map.Entry result = jedis.ftProfileSearch(indexName, profileParams, query, searchParams); assertThat(result, equalTo(expectedResponse)); - verify(commandExecutor).executeCommand(entrySearchResultMapStringObjectCommandObject); + verify(commandExecutor).executeKeylessCommand(entrySearchResultMapStringObjectCommandObject); verify(commandObjects).ftProfileSearch(indexName, profileParams, query, searchParams); } diff --git a/src/test/java/redis/clients/jedis/modules/search/AggregateIteratorTest.java b/src/test/java/redis/clients/jedis/modules/search/AggregateIteratorTest.java new file mode 100644 index 0000000000..4271064820 --- /dev/null +++ b/src/test/java/redis/clients/jedis/modules/search/AggregateIteratorTest.java @@ -0,0 +1,350 @@ +package redis.clients.jedis.modules.search; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.MethodSource; + +import redis.clients.jedis.DefaultJedisClientConfig; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Protocol; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.UnifiedJedis; + +import redis.clients.jedis.search.Document; +import redis.clients.jedis.search.IndexOptions; +import redis.clients.jedis.search.Schema; +import redis.clients.jedis.search.aggr.*; + +@ParameterizedClass +@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") +public class AggregateIteratorTest { + + private static final String index = "aggiteratorindex"; + private static final String address = System.getProperty("modulesDocker", Protocol.DEFAULT_HOST + ':' + 6479); + private static final HostAndPort hnp = HostAndPort.from(address); + + private final RedisProtocol protocol; + private Jedis jedis; + private UnifiedJedis client; + + public AggregateIteratorTest(RedisProtocol redisProtocol) { + this.protocol = redisProtocol; + } + + @BeforeEach + public void setUp() { + jedis = new Jedis(hnp, DefaultJedisClientConfig.builder().protocol(protocol).build()); + jedis.flushAll(); + client = new UnifiedJedis(hnp, DefaultJedisClientConfig.builder().protocol(protocol).build()); + } + + @AfterEach + public void tearDown() throws Exception { + client.close(); + jedis.close(); + } + + private void addDocument(Document doc) { + String key = doc.getId(); + Map map = new LinkedHashMap<>(); + doc.getProperties().forEach(entry -> map.put(entry.getKey(), String.valueOf(entry.getValue()))); + client.hset(key, map); + } + + + + @Test + public void testAggregateIteratorBasicUsage() { + // Create index and add test data + Schema sc = new Schema(); + sc.addSortableTextField("name", 1.0); + sc.addSortableNumericField("count"); + client.ftCreate(index, IndexOptions.defaultOptions(), sc); + + addDocument(new Document("data1").set("name", "abc").set("count", 10)); + addDocument(new Document("data2").set("name", "def").set("count", 5)); + addDocument(new Document("data3").set("name", "def").set("count", 25)); + addDocument(new Document("data4").set("name", "ghi").set("count", 15)); + addDocument(new Document("data5").set("name", "jkl").set("count", 20)); + + // Create aggregation with cursor + AggregationBuilder aggr = new AggregationBuilder() + .groupBy("@name", Reducers.sum("@count").as("sum")) + .sortBy(10, SortedField.desc("@sum")) + .cursor(2, 10000); // 2 results per batch + + // Test the iterator using the integrated method + try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) { + assertTrue(iterator.hasNext()); + assertNotNull(iterator.getCursorId()); + + int totalBatches = 0; + int totalRows = 0; + + while (iterator.hasNext()) { + AggregationResult batch = iterator.next(); + assertNotNull(batch); + assertNotNull(batch.getRows()); + assertTrue(batch.getRows().size() <= 2); // Batch size should not exceed cursor count + totalBatches++; + totalRows += batch.getRows().size(); + } + + assertTrue(totalBatches > 0); + assertEquals(4, totalRows); // Should have 4 groups total (abc, def, ghi, jkl) + assertFalse(iterator.hasNext()); + } + } + + @Test + @SuppressWarnings("resource") // Expected to throw exception before resource is created + public void testAggregateIteratorWithoutCursor() { + // Create aggregation without cursor - should throw exception + AggregationBuilder aggr = new AggregationBuilder() + .groupBy("@name", Reducers.sum("@count").as("sum")); + + assertThrows(IllegalArgumentException.class, () -> { + client.ftAggregateIterator(index, aggr); + }); + } + + @Test + public void testAggregateIteratorSingleBatch() { + // Create index and add test data + Schema sc = new Schema(); + sc.addSortableTextField("name", 1.0); + sc.addSortableNumericField("count"); + client.ftCreate(index, IndexOptions.defaultOptions(), sc); + + addDocument(new Document("data1").set("name", "abc").set("count", 10)); + addDocument(new Document("data2").set("name", "def").set("count", 5)); + + // Create aggregation with large cursor count (all results in one batch) + AggregationBuilder aggr = new AggregationBuilder() + .groupBy("@name", Reducers.sum("@count").as("sum")) + .sortBy(10, SortedField.desc("@sum")) + .cursor(100, 10000); // Large batch size + + // Test the iterator using the integrated method + try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) { + assertTrue(iterator.hasNext()); + + AggregationResult batch = iterator.next(); + assertNotNull(batch); + assertNotNull(batch.getRows()); + assertEquals(2, batch.getRows().size()); // Should have 2 groups (abc, def) + + // Should be no more batches + assertFalse(iterator.hasNext()); + } + } + + @Test + public void testAggregateIteratorFirstBatchReturnsInitialResults() { + // Create index and add test data + Schema sc = new Schema(); + sc.addSortableTextField("name", 1.0); + sc.addSortableNumericField("count"); + client.ftCreate(index, IndexOptions.defaultOptions(), sc); + + addDocument(new Document("data1").set("name", "abc").set("count", 10)); + addDocument(new Document("data2").set("name", "def").set("count", 5)); + addDocument(new Document("data3").set("name", "def").set("count", 25)); + + // Create aggregation with cursor that should return 2 results in first batch + AggregationBuilder aggr = new AggregationBuilder() + .groupBy("@name", Reducers.sum("@count").as("sum")) + .sortBy(10, SortedField.desc("@sum")) + .cursor(2, 10000); // 2 results per batch + + // Test that first next() call returns the initial FT.AGGREGATE results + try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) { + assertTrue(iterator.hasNext()); + + // First call should return initial results from FT.AGGREGATE + AggregationResult firstBatch = iterator.next(); + assertNotNull(firstBatch); + assertNotNull(firstBatch.getRows()); + assertEquals(2, firstBatch.getRows().size()); // Should have 2 groups (abc, def) + + // Verify the results are correct + List rows = firstBatch.getRows(); + assertEquals("def", rows.get(0).getString("name")); + assertEquals(30, rows.get(0).getLong("sum")); + assertEquals("abc", rows.get(1).getString("name")); + assertEquals(10, rows.get(1).getLong("sum")); + + // Should be no more batches since we got all results in first batch + AggregationResult secondBatch = iterator.next(); + assertEquals(0, secondBatch.getRows().size()); + } + } + + @Test + public void testAggregateIteratorEmptyResult() { + // Create index but add no data + Schema sc = new Schema(); + sc.addSortableTextField("name", 1.0); + sc.addSortableNumericField("count"); + client.ftCreate(index, IndexOptions.defaultOptions(), sc); + + // Create aggregation with cursor + AggregationBuilder aggr = new AggregationBuilder() + .groupBy("@name", Reducers.sum("@count").as("sum")) + .cursor(10, 10000); + + // Test the iterator with empty results using the integrated method + try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) { + // Should have no results + assertTrue(iterator.next().isEmpty()); + } + } + + @Test + public void testAggregateIteratorRemove() { + // Create index and add test data + Schema sc = new Schema(); + sc.addSortableTextField("name", 1.0); + sc.addSortableNumericField("count"); + client.ftCreate(index, IndexOptions.defaultOptions(), sc); + + addDocument(new Document("data1").set("name", "abc").set("count", 10)); + addDocument(new Document("data2").set("name", "def").set("count", 5)); + addDocument(new Document("data3").set("name", "def").set("count", 25)); + addDocument(new Document("data4").set("name", "ghi").set("count", 15)); + addDocument(new Document("data5").set("name", "jkl").set("count", 20)); + + // Create aggregation with cursor + AggregationBuilder aggr = new AggregationBuilder() + .groupBy("@name", Reducers.sum("@count").as("sum")) + .sortBy(10, SortedField.desc("@sum")) + .cursor(2, 10000); // 2 results per batch + + // Test remove() method + try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) { + assertTrue(iterator.hasNext()); + assertNotNull(iterator.getCursorId()); + assertTrue(iterator.getCursorId() > 0); + + // Get first batch + AggregationResult firstBatch = iterator.next(); + assertNotNull(firstBatch); + assertEquals(2, firstBatch.getRows().size()); + + // Should still have more results + assertTrue(iterator.hasNext()); + + // Remove the cursor - this should terminate the iteration + iterator.remove(); + + // After remove, should have no more results + assertFalse(iterator.hasNext()); + assertEquals(Long.valueOf(-1), iterator.getCursorId()); + + // Calling next() should throw NoSuchElementException + assertThrows(NoSuchElementException.class, iterator::next); + } + } + + @Test + public void testAggregateIteratorRemoveBeforeNext() { + // Create index and add test data + Schema sc = new Schema(); + sc.addSortableTextField("name", 1.0); + sc.addSortableNumericField("count"); + client.ftCreate(index, IndexOptions.defaultOptions(), sc); + + addDocument(new Document("data1").set("name", "abc").set("count", 10)); + addDocument(new Document("data2").set("name", "cde").set("count", 8)); + + // Create aggregation with cursor + AggregationBuilder aggr = new AggregationBuilder() + .groupBy("@name", Reducers.sum("@count").as("sum")) + .cursor(1, 10000); + + // Test calling remove() before next() - should work since cursor is initialized + try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) { + assertTrue(iterator.hasNext()); + assertTrue(iterator.getCursorId() > 0); + + // Remove without calling next() first + iterator.remove(); + + // After remove, should have no more results + assertFalse(iterator.hasNext()); + assertEquals(Long.valueOf(-1), iterator.getCursorId()); + } + } + + @Test + public void testAggregateIteratorRemoveAfterClose() { + // Create index and add test data + Schema sc = new Schema(); + sc.addSortableTextField("name", 1.0); + sc.addSortableNumericField("count"); + client.ftCreate(index, IndexOptions.defaultOptions(), sc); + + addDocument(new Document("data1").set("name", "abc").set("count", 10)); + + // Create aggregation with cursor + AggregationBuilder aggr = new AggregationBuilder() + .groupBy("@name", Reducers.sum("@count").as("sum")) + .cursor(10, 10000); + + AggregateIterator iterator = client.ftAggregateIterator(index, aggr); + assertTrue(iterator.hasNext()); + + // Close the iterator + iterator.close(); + + // Calling remove() after close should not throw exception, just return silently + iterator.remove(); // Should not throw + assertEquals(Long.valueOf(-1), iterator.getCursorId()); + } + + @Test + public void testAggregateIteratorRemoveMultipleTimes() { + // Create index and add test data + Schema sc = new Schema(); + sc.addSortableTextField("name", 1.0); + sc.addSortableNumericField("count"); + client.ftCreate(index, IndexOptions.defaultOptions(), sc); + + addDocument(new Document("data1").set("name", "abc").set("count", 10)); + addDocument(new Document("data2").set("name", "cde").set("count", 3)); + + // Create aggregation with cursor + AggregationBuilder aggr = new AggregationBuilder() + .groupBy("@name", Reducers.sum("@count").as("sum")) + .cursor(1, 10000); + + // Test calling remove() multiple times + try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) { + assertTrue(iterator.hasNext()); + + // First remove should work + iterator.remove(); + assertFalse(iterator.hasNext()); + assertEquals(-1L, iterator.getCursorId()); + + // Second remove should not throw exception, just return silently + iterator.remove(); // Should not throw + assertEquals(-1L, iterator.getCursorId()); + } + } +}