Skip to content

Request response policies #2996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/lua-multi-incr.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const client = createClient({
scripts: {
mincr: defineScript({
NUMBER_OF_KEYS: 2,
// TODO add RequestPolicy: ,
SCRIPT:
'return {' +
'redis.pcall("INCRBY", KEYS[1], ARGV[1]),' +
Expand Down
8 changes: 8 additions & 0 deletions packages/client/lib/client/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ export class BasicCommandParser implements CommandParser {
return tmp.join('_');
}

get commandName(): string | undefined {
let cmdName = this.#redisArgs[0];
if (cmdName instanceof Buffer) {
return cmdName.toString();
}
return cmdName;
}

push(...arg: Array<RedisArgument>) {
this.#redisArgs.push(...arg);
};
Expand Down
19 changes: 19 additions & 0 deletions packages/client/lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/
import { BasicCommandParser } from '../client/parser';
import { ASKING_CMD } from '../commands/ASKING';
import SingleEntryCache from '../single-entry-cache'
import { POLICIES, PolicyResolver, StaticPolicyResolver } from './request-response-policies';
interface ClusterCommander<
M extends RedisModules,
F extends RedisFunctions,
Expand Down Expand Up @@ -192,6 +193,7 @@ export default class RedisCluster<
parser.firstKey,
command.IS_READ_ONLY,
this._commandOptions,
parser.commandName!,
(client, opts) => client._executeCommand(command, parser, opts, transformReply)
);
};
Expand All @@ -208,6 +210,7 @@ export default class RedisCluster<
parser.firstKey,
command.IS_READ_ONLY,
this._self._commandOptions,
parser.commandName!,
(client, opts) => client._executeCommand(command, parser, opts, transformReply)
);
};
Expand All @@ -226,6 +229,7 @@ export default class RedisCluster<
parser.firstKey,
fn.IS_READ_ONLY,
this._self._commandOptions,
parser.commandName!,
(client, opts) => client._executeCommand(fn, parser, opts, transformReply)
);
};
Expand All @@ -244,6 +248,7 @@ export default class RedisCluster<
parser.firstKey,
script.IS_READ_ONLY,
this._commandOptions,
parser.commandName!,
(client, opts) => client._executeScript(script, parser, opts, transformReply)
);
};
Expand Down Expand Up @@ -299,6 +304,7 @@ export default class RedisCluster<

private _self = this;
private _commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>;
private _policyResolver: PolicyResolver;

/**
* An array of the cluster slots, each slot contain its `master` and `replicas`.
Expand Down Expand Up @@ -356,6 +362,8 @@ export default class RedisCluster<
if (options?.commandOptions) {
this._commandOptions = options.commandOptions;
}

this._policyResolver = new StaticPolicyResolver(POLICIES);
}

duplicate<
Expand Down Expand Up @@ -454,9 +462,19 @@ export default class RedisCluster<
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
options: ClusterCommandOptions | undefined,
commandName: string,
fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, opts?: ClusterCommandOptions) => Promise<T>
): Promise<T> {

const maxCommandRedirections = this._options.maxCommandRedirections ?? 16;
const policyResult = this._policyResolver.resolvePolicy(commandName)

if(policyResult.ok) {
//TODO
} else {
//TODO
}

let client = await this._slots.getClient(firstKey, isReadonly);
let i = 0;

Expand Down Expand Up @@ -512,6 +530,7 @@ export default class RedisCluster<
firstKey,
isReadonly,
options,
args[0] instanceof Buffer ? args[0].toString() : args[0],
(client, opts) => client.sendCommand(args, opts)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// import { RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from "../../RESP/types";
// import { ShardNode } from "../cluster-slots";
// import type { Either } from './types';

// export interface CommandRouter<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts,
// RESP extends RespVersions,
// TYPE_MAPPING extends TypeMapping> {
// routeCommand(
// command: string,
// policy: RequestPolicy,
// ): Either<ShardNode<M, F, S, RESP, TYPE_MAPPING>, 'no-available-nodes' | 'routing-failed'>;
// }
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import type { CommandReply } from '../../commands/generic-transformers';
import type { CommandPolicies } from './policies-constants';
import { REQUEST_POLICIES_WITH_DEFAULTS, RESPONSE_POLICIES_WITH_DEFAULTS } from './policies-constants';
import type { PolicyResolver, ModulePolicyRecords } from './types';
import { StaticPolicyResolver } from './static-policy-resolver';

/**
* Function type that returns command information from Redis
*/
export type CommandFetcher = () => Promise<Array<CommandReply>>;

/**
* A factory for creating policy resolvers that dynamically build policies based on the Redis server's COMMAND response.
*
* This factory fetches command information from Redis and analyzes the response to determine
* appropriate routing policies for each command, returning a StaticPolicyResolver with the built policies.
*/
export class DynamicPolicyResolverFactory {
/**
* Creates a StaticPolicyResolver by fetching command information from Redis
* and building appropriate policies based on the command characteristics.
*
* @param commandFetcher Function to fetch command information from Redis
* @param fallbackResolver Optional fallback resolver to use when policies are not found
* @returns A new StaticPolicyResolver with the fetched policies
*/
static async create(
commandFetcher: CommandFetcher,
fallbackResolver?: PolicyResolver
): Promise<PolicyResolver> {
const commands = await commandFetcher();
const policies: ModulePolicyRecords = {};

for (const command of commands) {
const parsed = DynamicPolicyResolverFactory.#parseCommandName(command.name);

// Skip commands with invalid format (more than one dot)
if (!parsed) {
continue;
}

const { moduleName, commandName } = parsed;

// Initialize module if it doesn't exist
if (!policies[moduleName]) {
policies[moduleName] = {};
}

// Determine policies for this command
const commandPolicies = DynamicPolicyResolverFactory.#buildCommandPolicies(command);
policies[moduleName][commandName] = commandPolicies;
}

return new StaticPolicyResolver(policies, fallbackResolver);
}

/**
* Parses a command name to extract module and command components.
*
* Redis commands can be in format:
* - "ping" -> module: "std", command: "ping"
* - "ft.search" -> module: "ft", command: "search"
*
* Commands with more than one dot are invalid.
*/
static #parseCommandName(fullCommandName: string): { moduleName: string; commandName: string } | null {
const parts = fullCommandName.split('.');

if (parts.length === 1) {
return { moduleName: 'std', commandName: fullCommandName };
}

if (parts.length === 2) {
return { moduleName: parts[0], commandName: parts[1] };
}

// Commands with more than one dot are invalid in Redis
return null;
}

/**
* Builds CommandPolicies for a command based on its characteristics.
*
* Priority order:
* 1. Use explicit policies from the command if available
* 2. Classify as DEFAULT_KEYLESS if keySpecification is empty
* 3. Classify as DEFAULT_KEYED if keySpecification is not empty
*/
static #buildCommandPolicies(command: CommandReply): CommandPolicies {
// Determine if command is keyless based on keySpecification
const isKeyless = command.isKeyless

// Determine default policies based on key specification
const defaultRequest = isKeyless
? REQUEST_POLICIES_WITH_DEFAULTS.DEFAULT_KEYLESS
: REQUEST_POLICIES_WITH_DEFAULTS.DEFAULT_KEYED;
const defaultResponse = isKeyless
? RESPONSE_POLICIES_WITH_DEFAULTS.DEFAULT_KEYLESS
: RESPONSE_POLICIES_WITH_DEFAULTS.DEFAULT_KEYED;

let subcommands: Record<string, CommandPolicies> | undefined;
if(command.subcommands.length > 0) {
subcommands = {};
for (const subcommand of command.subcommands) {

// Subcommands are in format "parentCommand|subcommand"
const parts = subcommand.name.split("\|")
if(parts.length !== 2) {
throw new Error(`Invalid subcommand name: ${subcommand.name}`);
}
const subcommandName = parts[1];

subcommands[subcommandName] = DynamicPolicyResolverFactory.#buildCommandPolicies(subcommand);
}
}

return {
request: command.policies.request ?? defaultRequest,
response: command.policies.response ?? defaultResponse,
isKeyless,
subcommands
};
}
}
Loading
Loading