Skip to content

Add framework for cross-slot commands, enable for MGET #1986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: unstable
Choose a base branch
from

Conversation

PingXie
Copy link
Member

@PingXie PingXie commented Apr 22, 2025

Introduces the CMD_CROSS_SLOT flag to allow specific commands to operate on keys residing in multiple slots, provided those slots are managed by the same shard.

Key changes:

  • Added CMD_CROSS_SLOT command flag and corresponding cross_slot client flag.
  • Modified getNodeByQuery in cluster.c to:
    • Check for the CMD_CROSS_SLOT flag.
    • Allow keys in different slots if they belong to the same node.
    • Introduce CLUSTER_REDIR_CROSS_SHARD error if keys span multiple nodes.
    • Prevent cross-slot operations if any involved slot is migrating/importing.
  • Introduced lazy_expire_disabled client flag to prevent lazy expiration during cross-slot command execution.
  • Updated MGET command definition to use the new CMD_CROSS_SLOT flag.
  • Adjusted related logic (client reset, script error handling, key slot checks) to accommodate the new flags and behavior.

This lays the groundwork for easily enabling cross-slot semantics for other commands in the future while enabling it for MGET immediately.

@PingXie
Copy link
Member Author

PingXie commented Apr 22, 2025

This PR is based on the following feedback provided by the community:

  1. Cross-slots MGET has significant performance boost (validated);
  2. Cross-slots WRITE is messy and not interesting;
  3. Value of supporting cross-slots for commands other than MGET isn't clear;
  4. Cross-slots READ shouldn't have the WRITE side effect because doing so complicates the replication;
  5. We can start with MGET but it should be easy to extend it to other commands if needed without significantly increasing the command surface;

Note that this PR will reject an otherwise valid cross-slot MGET command if one of its slots is being migrated but when that is only for the legacy slot migration. It will work transparently with the planned atomic slot migration (#23).

Copy link

codecov bot commented Apr 22, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 71.04%. Comparing base (5df95b3) to head (15047ce).
Report is 2 commits behind head on unstable.

Additional details and impacted files
@@             Coverage Diff              @@
##           unstable    #1986      +/-   ##
============================================
- Coverage     71.04%   71.04%   -0.01%     
============================================
  Files           123      123              
  Lines         66113    66135      +22     
============================================
+ Hits          46972    46987      +15     
- Misses        19141    19148       +7     
Files with missing lines Coverage Δ
src/cluster.c 90.66% <100.00%> (+0.64%) ⬆️
src/commands.def 100.00% <ø> (ø)
src/db.c 89.58% <100.00%> (+<0.01%) ⬆️
src/networking.c 87.30% <100.00%> (+<0.01%) ⬆️
src/script.c 87.69% <ø> (ø)
src/server.c 87.94% <100.00%> (+0.03%) ⬆️
src/server.h 100.00% <ø> (ø)

... and 14 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@PingXie
Copy link
Member Author

PingXie commented Apr 22, 2025

@valkey-io/core-team PTAL

@PingXie PingXie requested a review from Copilot April 22, 2025 06:02
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for cross-slot commands by introducing the CMD_CROSS_SLOT flag and associated client flags to allow commands like MGET to operate on keys from different slots if they reside on the same shard. Key changes include the definition of the CMD_CROSS_SLOT flag and new client flags, updates to cluster node lookup and redirection logic in cluster.c, and adjustments to client flag handling and error messaging in related modules.

Reviewed Changes

Copilot reviewed 7 out of 10 changed files in this pull request and generated no comments.

Show a summary per file
File Description
src/server.h Adds the CMD_CROSS_SLOT flag and updates ClientFlags along with inline documentation.
src/server.c Adjusts command processing to account for the lazy_expire_disabled flag during execution.
src/script.c Updates script error messaging to include a cross-shard redirection error code.
src/networking.c Resets new client flags in the resetClient routine.
src/db.c Modifies getKeySlot logic to conditionally bypass hash slot checks when cross_slot is enabled.
src/cluster.h Introduces a new cluster error code, CLUSTER_REDIR_CROSS_SHARD.
src/cluster.c Extends getNodeByQuery logic to support cross-slot commands and adjusts client redirection.
Files not reviewed (3)
  • src/commands.def: Language not supported
  • src/commands/mget.json: Language not supported
  • tests/unit/cluster/cross-slots.tcl: Language not supported
Comments suppressed due to low confidence (2)

src/db.c:261

  • [nitpick] Review the new condition to ensure it correctly distinguishes between primary and cross-slot behavior. Consider adding an inline comment to clarify that the cross_slot flag bypasses the usual hash slot assertion.
        !server.current_client->flag.cross_slot && !server.current_client->flag.primary) {

src/cluster.c:1111

  • [nitpick] Consider adding an inline comment explaining that setting the cross_slot and lazy_expire_disabled flags indicates the request involves keys across different slots on the same node, thus requiring relaxed slot checks.
                        c->flag.cross_slot = 1;

@zuiderkwast zuiderkwast moved this to Idea in Roadmap Apr 22, 2025
src/cluster.c Outdated

/* Return the hashslot by reference. */
if (hashslot) *hashslot = slot;
if (hashslot) *hashslot = first_slot;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slot statistics are updated based off of this information, we will bias towards the first_slot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the reminder on slot stats :)

the bias actually comes from ioThreadReadQueryFromClient but now I think there is more work to make this change full compatible with slot-stats

@@ -2413,6 +2415,8 @@ typedef int serverGetKeysProc(struct serverCommand *cmd, robj **argv, int argc,
* CMD_TOUCHES_ARBITRARY_KEYS: The command may touch (and cause lazy-expire)
* arbitrary key (i.e not provided in argv)
*
* CMD_CROSS_SLOT: The command may access keys in different slots.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to expose this into the module API presumably, since there is a JSON command for multi-gets that can fan out. JSON.MGET.

@@ -258,7 +258,7 @@ int getKeySlot(sds key) {
* so we must always recompute the slot for commands coming from the primary.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command &&
!server.current_client->flag.primary) {
!server.current_client->flag.cross_slot && !server.current_client->flag.primary) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've used ->slot = -1 to indicate it's cross slot in the past. We could also reserve -2 for this purpose. This allows us to avoid taking another flag.

@@ -1202,7 +1234,9 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* node we want to mention in the redirection. Moreover hashslot should
* be set to the hash slot that caused the redirection. */
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
if (error_code == CLUSTER_REDIR_CROSS_SHARD) {
addReplyError(c, "-CROSSSHARD Keys in request don't hash to the same shard");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make the comment here, since I think this is the main point of discussion. So the client experience will be:

  1. Clients will be updated so that they can decompose MGET into per shard MGETs and send them out.
  2. If the clients are updated and get a CROSSSHARD error, they will realize they need to discover topology again and then recompose the commands. If they aren't updated, they'll just throw the error up. So, technically a breaking change since we are changing the error code! (CLIENT CAPA mitigates this if we want)
  3. The framework easily works on fan out commands. (As mentioned, DEL, UNLINK, EXISTS TOUCH, MSET, MGET) The commands that aren't fan out (SUNIONSTORE for example), we'll have issues with atomic slot migration in replicating the cross key dependencies since we can't replicate verbatim as the target won't have the other keys. Clients also don't have trivial strategies for merging, so maybe we'll never have a need for that.
  4. Clients break atomicity when doing this fan out. I suppose our suggestion to clients will be to expose a way in each client to handle this special behavior. (cross_shard_mget() vs mget() or have an argument when creating a client to introduce the fanout option) If the client just exposes a raw API, it's up to the end user to orchestrate the fan out I suppose.

My other suggestion was that we should introduce a new caching focused command MXGET, that never throws the CROSSSHARD error. If a server receives that command, and it doesn't serve the slots associated with it, it'll just return empty values for it.

My only strong opinion is the slot migration problem in 3, everything else in this PR seems OK to me.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like adding new command like MXGET, we can make a more convenient interface this way. In particular, having per-slot or, even better, per-key errors would make usage easier for the cluster case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per key errors? So we can include an error in an array reply? That's interesting. I'm not sure any other command does that. Not even EXEC, I think.

Copy link

@ovaiskhansc ovaiskhansc Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the clients are updated and get a CROSSSHARD error, they will realize they need to discover topology again and then recompose the commands. If they aren't updated, they'll just throw the error up. So, technically a breaking change since we are changing the error code! (CLIENT CAPA mitigates this if we want)

Yes, it's a breaking change in that respect and based on previous discussions (regarding admins may control the server settings and developers not being aware of those), this should be a thing that developer opts in via CLIENT CAPA or a new command.

A sidenote on error handling, based on Snap's experience in running with this mode for over a year, this usually happens during scale-up or scale down when slot assignment is changing at a fairly fast rate so fast that just refreshing the topology isn't sufficient. The best way we have been able to mitigate this is by converting the re-try to fall back to pipelined gets or single slot mget. A new command that can return partial data, would be great.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we don't have multiple errors right now.
The typical use case in distributed world is when we ask multiple keys they actually randomly spread across multiple slots and machines. We don't care about atomicity there, something that MGET is designed for.
Here, we want to be able combine multiple requests into one to save a lot of compute.
But I also want to be able get partial results, so even if slot is migrated/ing I still data for all keys that is there.
I don't want to call this cell again, do all the parsing etc.
In some applications I don't even care about getting a full response and can skip keys in moved slot altogether.

Copy link
Member

@madolson madolson Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return the data that is stable? I.E. if we have 4 of the 8 slots from the requests, we return the data from those 4 slots and then give some indication that the data was unavailable for the remaining ones?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clients will be updated so that they can decompose MGET into per shard MGETs and send them out.

Yes regardless of the interface design/implementation, this needs client support though some clients are easier than others since they already split keys by slots today, including Valkey-go and Lettuce.

If the clients are updated and get a CROSSSHARD error, they will realize they need to discover topology again and then recompose the commands.

I don't think this is the case. For clients that don't slice the keys (by slots), they could get CROSS_SLOT already, which will be handled by the application. With this new behavior of MGET (assuming we are going with this interface), CROSS_SHARD could be returned so there could be some confusion during development but that shouldn't happen in production.

The framework easily works on fan out commands. (As mentioned, DEL, UNLINK, EXISTS TOUCH, MSET, MGET) The commands that aren't fan out (SUNIONSTORE for example),

Yes there will be more work for WRITE but I am convinced now that WRITE is not desired . We can revisit this framework if there is a legit use case and I think it just needs incremental work and we can keep the work in this PR.

Clients break atomicity when doing this fan out.

It is already the case for some clients like Valkey-go/Lettuce, which I checked.

My other suggestion was that we should introduce a new caching focused command MXGET

Not a hard blocker for me. I am open to this suggestion.

per-key errors would make usage easier for the cluster case.
The best way we have been able to mitigate this is by converting the re-try to fall back to pipelined gets or single slot mget. A new command that can return partial data, would be great.

Interesting - would return empty string back in this case work? I think we would need a new data type (meaning modifying the RESP protocol) if we would like to introduce a per-key error. It is not clean to use a special string to indicate errors.

Yes, it's a breaking change in that respect and based on previous discussions (regarding admins may control the server settings and developers not being aware of those), this should be a thing that developer opts in via CLIENT CAPA or a new command.

I thought about opting in via CLIENT CAPA earlier in the other thread but I feel that the value isn't much. Please see my comments above.

Also if we are in favor of a new command, this becomes a moot point.

@hwware
Copy link
Member

hwware commented Apr 22, 2025

I just take a short look this PR, my question is how client enables and disable this feature? Or it is enabled by default?

@PingXie PingXie added major-decision-pending Major decision pending by TSC team client-changes-needed Client changes may be required for this feature release-notes This issue should get a line item in the release notes needs-doc-pr This change needs to update a documentation page. Remove label once doc PR is open. labels Apr 22, 2025
PingXie added 2 commits April 25, 2025 22:12
Introduces the `CMD_CROSS_SLOT` flag to allow specific commands
to operate on keys residing in multiple slots, provided those slots
are managed by the same shard.

Key changes:
- Added `CMD_CROSS_SLOT` command flag and corresponding `cross_slot` client flag.
- Modified `getNodeByQuery` in `cluster.c` to:
    - Check for the `CMD_CROSS_SLOT` flag.
    - Allow keys in different slots if they belong to the same node.
    - Introduce `CLUSTER_REDIR_CROSS_SHARD` error if keys span multiple nodes.
    - Prevent cross-slot operations if any involved slot is migrating/importing.
- Introduced `lazy_expire_disabled` client flag to prevent lazy expiration
  during cross-slot command execution.
- Updated `MGET` command definition to use the new `CMD_CROSS_SLOT` flag.
- Adjusted related logic (client reset, script error handling, key slot checks)
  to accommodate the new flags and behavior.

This lays the groundwork for easily enabling cross-slot semantics for
other commands in the future while enabling it for MGET immediately.

Signed-off-by: Ping Xie <pingxie@google.com>
Change-Id: I6d8ca2ebc326b7ca71be074af51514f7496a2569
Signed-off-by: Ping Xie <pingxie@google.com>
operations.

Change-Id: Ifba90d5b792c311365e58257c9d61af1b7050d29
Signed-off-by: Ping Xie <pingxie@google.com>
@PingXie
Copy link
Member Author

PingXie commented Apr 28, 2025

Explicit requirements

  1. performance is the goal; atomicity is not required
  2. MGET is the only command of interest for now; however, we should be able to introduce cross-slot to other "read" commands easily with this work
  3. "write" is not desired/needed in the foreseeable future but the work in this PR should allow the support for cross-slot "write" commands to be introduced in a compatible experience for developers
  4. need to support module
  5. need to be compatible with existing engine features, including slot level metrics and memory prefetching

High level decisions for discussion

  1. expanding MGET vs introducing a new command?
  2. if new command, what should the new command be called? how to create an easily identifiable naming pattern to help reduce the learning curve for the developers?
  3. should we give the client a say about whether it'd like to work with unstable slots or not?
  4. if we support unstable slots, what should be returned if the requested key is not found in the unstable slot? an error or {}? if error, what data type should be used?
  5. in the redirection case, should we introduce a new -ASK/-MOVED error that allows all the slots in question to be returned? otherwise, the client can update one slot's owning shard at a time upon receiving the existing -ASK/-MOVED error. it would take the client N retries to catch up with the cluster topology updates, where N is the number of slots in the original cross-slot request.
  6. how do we divide up the resources, cpu/network/memory, consumed by a cross-slot mget?

@valkey-io/core-team

@PingXie
Copy link
Member Author

PingXie commented May 5, 2025

I did some more digging on pipelining. I think the core issue is that only the first command in a pipeline batch benefits from the memory prefetching. The remaining ones are all run with cold cache on the main thread (and get parsed on the main thread as well).

this is the flame chart that illustrates these two points, indirectly, 1) the presence of processInputBuffer on the main thread; 2) the much wider margin on the io threads, indicating that io threads are merely burning cycles without doing real work.
image

Here is the mget flamechart for comparison.

image

I will see if I can get the more definitive (v)PMU counters next.

@zuiderkwast
Copy link
Contributor

zuiderkwast commented May 5, 2025

Thanks Ping for the analysis!

The current IO threads design is indeed not optimized for pipelining. It parses one command from each client and hands it over to the main thread. The main thread executes the parsed command and then, if there is more in the input buffer, the main thread parses it and executes the rest of the pipeline. We can see this in processPendingCommandAndInputBuffer().

In processInputBuffer() there is a while-loop that reads, parses and executes all the available commands in the pipeline. I believe this explains what you see in the flame graph.

If IO-threads are disabled, that the main thread prioritizes pipelines and doesn't interleave it with commands from other clients, which is good for pipelines and transactions, but probably still not good enough to compete with cross-slot-MGET.

There is a follow-up idea for IO-threading to let the IO thread parse more than one command: #761, the bullet "Parsing offload to parse multiple commands". I created a specific issue for it now:

Even with IO-threads disabled, we can do "look-ahead" of pipelined commands by peeking into the input buffer. If we detect that there is another GET or MGET command in the pipeline, we could parse it and prefetch the keys while we execute the current command.

There is a feature plan to allow executing readonly commands in parallel in different threads if the commands are operating on different slots: #2022. It takes advantage of the fact that each command operates on a single slot.

@PingXie
Copy link
Member Author

PingXie commented May 5, 2025

The key contributor here is actually the memory perfecting. In my test, with enabling memory prefetching just for the first slot in a cross-slot MGET command, which is what happens in this PR, its performance (in terms of keys per second) is pretty much on par with that of pipelined GETs. That being said, you are right that being able to parse more than more commands is the prerequisite to the effective prefetching.

BTW, I wonder if we really need to pay for the complexity of "look-ahead" - I feel that all we need is to parse the entire qbuf into a command queue maintained on the client struct and then ensure the memory prefetching logic pop one command off of that queue one at a time.

@madolson
Copy link
Member

madolson commented May 5, 2025

BTW, I wonder if we really need to pay for the complexity of "look-ahead" - I feel that all we need is to parse the entire qbuf into a command queue maintained on the client struct and then ensure the memory prefetching logic pop one command off of that queue one at a time.

I also wonder if we need to parse the whole query buffer, or just the next command. If we prefetch the data for the next command while we execute the current one, it might resolve to similar amounts of time.

@zuiderkwast
Copy link
Contributor

BTW, I wonder if we really need to pay for the complexity of "look-ahead" - I feel that all we need is to parse the entire qbuf into a command queue maintained on the client struct and then ensure the memory prefetching logic pop one command off of that queue one at a time.

Yeah, this is the better solution. It may require some more refactoring of the parsing code though.

I also wonder if we need to parse the whole query buffer, or just the next command. If we prefetch the data for the next command while we execute the current one, it might resolve to similar amounts of time.

To get maximum benefit, I believe we should parse a few commands ahead, so we can prefetch a batch of ~16 keys, whether it's a single MGET command or 16 GET/SET/HSET/SADD commands.

The prefetching is an iterative process of 3-4 steps. After doing the first step on each key lookup (e.g. prefetching the hashtable bucket), we need to wait for the memory to be loaded before we can do the next step (e.g. prefetching the key of a potential match before we check if it's a match). When we prefetch in batches, we pay the cost of the cold memory access once per batch instead of once per key. But we can also gain if we can always be a few steps ahead of the execution: When we execute a command, we do one step of prefetching on keys of the commands that are waiting in the queue. In this way, we can have everything prefetched if we just prefetch 3-4 commands ahead.

If you look at the recent optimization of the hashtable iterator, it uses this technique. It prefetches a few steps ahead of the element returned by hashtableNext(). See #1501.

@madolson
Copy link
Member

madolson commented May 5, 2025

BTW, I wonder if we really need to pay for the complexity of "look-ahead" - I feel that all we need is to parse the entire qbuf into a command queue maintained on the client struct and then ensure the memory prefetching logic pop one command off of that queue one at a time.

Maybe we should be thinking of the entire system as more like a pipeline of commands, just like the CPU does. The main command execution thread has a series of commands it needs to execute in various stages of prepared to execute:

Head: SET FOO BAR; FOO has hash bucket and serverObject prefetched. Execute!
Queued Command1: GET FOO2; FOO2 has hash bucket prefetched. Prefetch server Object.
Queued Command2: GET FOO3; FOO3 has nothing prefetched. Prefetch hash bucket.
Queued Command3: GET FOO4; Foo4 has nothing prefetched. Pipeline is full, don't start processing this command yet.

Complex operations like multi-exec might need more prefetching stages. The actual set of commands could still be maintained on the client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
client-changes-needed Client changes may be required for this feature major-decision-pending Major decision pending by TSC team needs-doc-pr This change needs to update a documentation page. Remove label once doc PR is open. release-notes This issue should get a line item in the release notes
Projects
Status: Idea
Development

Successfully merging this pull request may close these issues.

6 participants