-
Notifications
You must be signed in to change notification settings - Fork 817
Add framework for cross-slot commands, enable for MGET #1986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Conversation
This PR is based on the following feedback provided by the community:
Note that this PR will reject an otherwise valid cross-slot |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## unstable #1986 +/- ##
============================================
- Coverage 71.04% 71.04% -0.01%
============================================
Files 123 123
Lines 66113 66135 +22
============================================
+ Hits 46972 46987 +15
- Misses 19141 19148 +7
🚀 New features to boost your workflow:
|
@valkey-io/core-team PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for cross-slot commands by introducing the CMD_CROSS_SLOT flag and associated client flags to allow commands like MGET to operate on keys from different slots if they reside on the same shard. Key changes include the definition of the CMD_CROSS_SLOT flag and new client flags, updates to cluster node lookup and redirection logic in cluster.c, and adjustments to client flag handling and error messaging in related modules.
Reviewed Changes
Copilot reviewed 7 out of 10 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
src/server.h | Adds the CMD_CROSS_SLOT flag and updates ClientFlags along with inline documentation. |
src/server.c | Adjusts command processing to account for the lazy_expire_disabled flag during execution. |
src/script.c | Updates script error messaging to include a cross-shard redirection error code. |
src/networking.c | Resets new client flags in the resetClient routine. |
src/db.c | Modifies getKeySlot logic to conditionally bypass hash slot checks when cross_slot is enabled. |
src/cluster.h | Introduces a new cluster error code, CLUSTER_REDIR_CROSS_SHARD. |
src/cluster.c | Extends getNodeByQuery logic to support cross-slot commands and adjusts client redirection. |
Files not reviewed (3)
- src/commands.def: Language not supported
- src/commands/mget.json: Language not supported
- tests/unit/cluster/cross-slots.tcl: Language not supported
Comments suppressed due to low confidence (2)
src/db.c:261
- [nitpick] Review the new condition to ensure it correctly distinguishes between primary and cross-slot behavior. Consider adding an inline comment to clarify that the cross_slot flag bypasses the usual hash slot assertion.
!server.current_client->flag.cross_slot && !server.current_client->flag.primary) {
src/cluster.c:1111
- [nitpick] Consider adding an inline comment explaining that setting the cross_slot and lazy_expire_disabled flags indicates the request involves keys across different slots on the same node, thus requiring relaxed slot checks.
c->flag.cross_slot = 1;
src/cluster.c
Outdated
|
||
/* Return the hashslot by reference. */ | ||
if (hashslot) *hashslot = slot; | ||
if (hashslot) *hashslot = first_slot; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slot statistics are updated based off of this information, we will bias towards the first_slot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the reminder on slot stats :)
the bias actually comes from ioThreadReadQueryFromClient
but now I think there is more work to make this change full compatible with slot-stats
@@ -2413,6 +2415,8 @@ typedef int serverGetKeysProc(struct serverCommand *cmd, robj **argv, int argc, | |||
* CMD_TOUCHES_ARBITRARY_KEYS: The command may touch (and cause lazy-expire) | |||
* arbitrary key (i.e not provided in argv) | |||
* | |||
* CMD_CROSS_SLOT: The command may access keys in different slots. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to expose this into the module API presumably, since there is a JSON command for multi-gets that can fan out. JSON.MGET
.
@@ -258,7 +258,7 @@ int getKeySlot(sds key) { | |||
* so we must always recompute the slot for commands coming from the primary. | |||
*/ | |||
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command && | |||
!server.current_client->flag.primary) { | |||
!server.current_client->flag.cross_slot && !server.current_client->flag.primary) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've used ->slot = -1
to indicate it's cross slot in the past. We could also reserve -2
for this purpose. This allows us to avoid taking another flag.
@@ -1202,7 +1234,9 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int | |||
* node we want to mention in the redirection. Moreover hashslot should | |||
* be set to the hash slot that caused the redirection. */ | |||
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) { | |||
if (error_code == CLUSTER_REDIR_CROSS_SLOT) { | |||
if (error_code == CLUSTER_REDIR_CROSS_SHARD) { | |||
addReplyError(c, "-CROSSSHARD Keys in request don't hash to the same shard"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will make the comment here, since I think this is the main point of discussion. So the client experience will be:
- Clients will be updated so that they can decompose MGET into per shard MGETs and send them out.
- If the clients are updated and get a
CROSSSHARD
error, they will realize they need to discover topology again and then recompose the commands. If they aren't updated, they'll just throw the error up. So, technically a breaking change since we are changing the error code! (CLIENT CAPA mitigates this if we want) - The framework easily works on fan out commands. (As mentioned, DEL, UNLINK, EXISTS TOUCH, MSET, MGET) The commands that aren't fan out (SUNIONSTORE for example), we'll have issues with atomic slot migration in replicating the cross key dependencies since we can't replicate verbatim as the target won't have the other keys. Clients also don't have trivial strategies for merging, so maybe we'll never have a need for that.
- Clients break atomicity when doing this fan out. I suppose our suggestion to clients will be to expose a way in each client to handle this special behavior. (cross_shard_mget() vs mget() or have an argument when creating a client to introduce the fanout option) If the client just exposes a raw API, it's up to the end user to orchestrate the fan out I suppose.
My other suggestion was that we should introduce a new caching focused command MXGET
, that never throws the CROSSSHARD
error. If a server receives that command, and it doesn't serve the slots associated with it, it'll just return empty values for it.
My only strong opinion is the slot migration problem in 3, everything else in this PR seems OK to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like adding new command like MXGET, we can make a more convenient interface this way. In particular, having per-slot or, even better, per-key errors would make usage easier for the cluster case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per key errors? So we can include an error in an array reply? That's interesting. I'm not sure any other command does that. Not even EXEC, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the clients are updated and get a CROSSSHARD error, they will realize they need to discover topology again and then recompose the commands. If they aren't updated, they'll just throw the error up. So, technically a breaking change since we are changing the error code! (CLIENT CAPA mitigates this if we want)
Yes, it's a breaking change in that respect and based on previous discussions (regarding admins may control the server settings and developers not being aware of those), this should be a thing that developer opts in via CLIENT CAPA or a new command.
A sidenote on error handling, based on Snap's experience in running with this mode for over a year, this usually happens during scale-up or scale down when slot assignment is changing at a fairly fast rate so fast that just refreshing the topology isn't sufficient. The best way we have been able to mitigate this is by converting the re-try to fall back to pipelined gets or single slot mget. A new command that can return partial data, would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, we don't have multiple errors right now.
The typical use case in distributed world is when we ask multiple keys they actually randomly spread across multiple slots and machines. We don't care about atomicity there, something that MGET is designed for.
Here, we want to be able combine multiple requests into one to save a lot of compute.
But I also want to be able get partial results, so even if slot is migrated/ing I still data for all keys that is there.
I don't want to call this cell again, do all the parsing etc.
In some applications I don't even care about getting a full response and can skip keys in moved slot altogether.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we return the data that is stable? I.E. if we have 4 of the 8 slots from the requests, we return the data from those 4 slots and then give some indication that the data was unavailable for the remaining ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exactly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clients will be updated so that they can decompose MGET into per shard MGETs and send them out.
Yes regardless of the interface design/implementation, this needs client support though some clients are easier than others since they already split keys by slots today, including Valkey-go and Lettuce.
If the clients are updated and get a CROSSSHARD error, they will realize they need to discover topology again and then recompose the commands.
I don't think this is the case. For clients that don't slice the keys (by slots), they could get CROSS_SLOT already, which will be handled by the application. With this new behavior of MGET (assuming we are going with this interface), CROSS_SHARD could be returned so there could be some confusion during development but that shouldn't happen in production.
The framework easily works on fan out commands. (As mentioned, DEL, UNLINK, EXISTS TOUCH, MSET, MGET) The commands that aren't fan out (SUNIONSTORE for example),
Yes there will be more work for WRITE
but I am convinced now that WRITE
is not desired . We can revisit this framework if there is a legit use case and I think it just needs incremental work and we can keep the work in this PR.
Clients break atomicity when doing this fan out.
It is already the case for some clients like Valkey-go/Lettuce, which I checked.
My other suggestion was that we should introduce a new caching focused command MXGET
Not a hard blocker for me. I am open to this suggestion.
per-key errors would make usage easier for the cluster case.
The best way we have been able to mitigate this is by converting the re-try to fall back to pipelined gets or single slot mget. A new command that can return partial data, would be great.
Interesting - would return empty string back in this case work? I think we would need a new data type (meaning modifying the RESP protocol) if we would like to introduce a per-key error. It is not clean to use a special string to indicate errors.
Yes, it's a breaking change in that respect and based on previous discussions (regarding admins may control the server settings and developers not being aware of those), this should be a thing that developer opts in via CLIENT CAPA or a new command.
I thought about opting in via CLIENT CAPA earlier in the other thread but I feel that the value isn't much. Please see my comments above.
Also if we are in favor of a new command, this becomes a moot point.
I just take a short look this PR, my question is how client enables and disable this feature? Or it is enabled by default? |
Introduces the `CMD_CROSS_SLOT` flag to allow specific commands to operate on keys residing in multiple slots, provided those slots are managed by the same shard. Key changes: - Added `CMD_CROSS_SLOT` command flag and corresponding `cross_slot` client flag. - Modified `getNodeByQuery` in `cluster.c` to: - Check for the `CMD_CROSS_SLOT` flag. - Allow keys in different slots if they belong to the same node. - Introduce `CLUSTER_REDIR_CROSS_SHARD` error if keys span multiple nodes. - Prevent cross-slot operations if any involved slot is migrating/importing. - Introduced `lazy_expire_disabled` client flag to prevent lazy expiration during cross-slot command execution. - Updated `MGET` command definition to use the new `CMD_CROSS_SLOT` flag. - Adjusted related logic (client reset, script error handling, key slot checks) to accommodate the new flags and behavior. This lays the groundwork for easily enabling cross-slot semantics for other commands in the future while enabling it for MGET immediately. Signed-off-by: Ping Xie <pingxie@google.com>
Change-Id: I6d8ca2ebc326b7ca71be074af51514f7496a2569 Signed-off-by: Ping Xie <pingxie@google.com>
operations. Change-Id: Ifba90d5b792c311365e58257c9d61af1b7050d29 Signed-off-by: Ping Xie <pingxie@google.com>
Explicit requirements
High level decisions for discussion
@valkey-io/core-team |
Thanks Ping for the analysis! The current IO threads design is indeed not optimized for pipelining. It parses one command from each client and hands it over to the main thread. The main thread executes the parsed command and then, if there is more in the input buffer, the main thread parses it and executes the rest of the pipeline. We can see this in In If IO-threads are disabled, that the main thread prioritizes pipelines and doesn't interleave it with commands from other clients, which is good for pipelines and transactions, but probably still not good enough to compete with cross-slot-MGET. There is a follow-up idea for IO-threading to let the IO thread parse more than one command: #761, the bullet "Parsing offload to parse multiple commands". I created a specific issue for it now: Even with IO-threads disabled, we can do "look-ahead" of pipelined commands by peeking into the input buffer. If we detect that there is another GET or MGET command in the pipeline, we could parse it and prefetch the keys while we execute the current command. There is a feature plan to allow executing readonly commands in parallel in different threads if the commands are operating on different slots: #2022. It takes advantage of the fact that each command operates on a single slot. |
The key contributor here is actually the memory perfecting. In my test, with enabling memory prefetching just for the first slot in a cross-slot MGET command, which is what happens in this PR, its performance (in terms of keys per second) is pretty much on par with that of pipelined GETs. That being said, you are right that being able to parse more than more commands is the prerequisite to the effective prefetching. BTW, I wonder if we really need to pay for the complexity of "look-ahead" - I feel that all we need is to parse the entire qbuf into a command queue maintained on the client struct and then ensure the memory prefetching logic pop one command off of that queue one at a time. |
I also wonder if we need to parse the whole query buffer, or just the next command. If we prefetch the data for the next command while we execute the current one, it might resolve to similar amounts of time. |
Yeah, this is the better solution. It may require some more refactoring of the parsing code though.
To get maximum benefit, I believe we should parse a few commands ahead, so we can prefetch a batch of ~16 keys, whether it's a single MGET command or 16 GET/SET/HSET/SADD commands. The prefetching is an iterative process of 3-4 steps. After doing the first step on each key lookup (e.g. prefetching the hashtable bucket), we need to wait for the memory to be loaded before we can do the next step (e.g. prefetching the key of a potential match before we check if it's a match). When we prefetch in batches, we pay the cost of the cold memory access once per batch instead of once per key. But we can also gain if we can always be a few steps ahead of the execution: When we execute a command, we do one step of prefetching on keys of the commands that are waiting in the queue. In this way, we can have everything prefetched if we just prefetch 3-4 commands ahead. If you look at the recent optimization of the hashtable iterator, it uses this technique. It prefetches a few steps ahead of the element returned by hashtableNext(). See #1501. |
Maybe we should be thinking of the entire system as more like a pipeline of commands, just like the CPU does. The main command execution thread has a series of commands it needs to execute in various stages of prepared to execute:
Complex operations like multi-exec might need more prefetching stages. The actual set of commands could still be maintained on the client. |
Introduces the
CMD_CROSS_SLOT
flag to allow specific commands to operate on keys residing in multiple slots, provided those slots are managed by the same shard.Key changes:
CMD_CROSS_SLOT
command flag and correspondingcross_slot
client flag.getNodeByQuery
incluster.c
to:CMD_CROSS_SLOT
flag.CLUSTER_REDIR_CROSS_SHARD
error if keys span multiple nodes.lazy_expire_disabled
client flag to prevent lazy expiration during cross-slot command execution.MGET
command definition to use the newCMD_CROSS_SLOT
flag.This lays the groundwork for easily enabling cross-slot semantics for other commands in the future while enabling it for MGET immediately.