Skip to content

Commit 2fe08f8

Browse files
xbaselsoloestoyzuiderkwastmadolsonranshid
authored
Add multi-database support to cluster mode (#1671)
## cluster: add multi-database support in cluster mode Add multi-database support in cluster mode to align with standalone mode and facilitate migration. Previously, cluster mode was restricted to a single database (DB0). This change allows multiple databases while preserving the existing slot-based key distribution. ### Key Features: - Database-Agnostic Hashing. The hashing algorithm is unchanged. Identical keys always map to the same slot across all databases, ensuring consistent key distribution and compatibility with existing single-database setups. - Multi-DB commands support. SELECT, MOVE, and COPY are now supported in cluster mode. - Fully backward compatible with no API changes. - SWAPDB is not supported in cluster mode. It is unsafe due to inconsistency risks. ### Command-Level Changes: - SELECT / MOVE / COPY are now supported in cluster mode. - MOVE / COPY (with db) are rejected (TRYAGAIN error) during slot migration to prevent multi-DB inconsistencies. - SWAPDB will return an error if used when cluster mode is enabled. - GETKEYSINSLOT, COUNTKEYSINSLOT and MIGRATE will operate in the context of the selected database. This means, for example, that migrating keys in a slot will require iterating and repeating across all databases. ### Slot Migration Process: - Multi-DB support in cluster mode affects slot migration. Operators should now iterate over all the configured databases. ### Transaction Handling (MULTI/EXEC): - getNodeByQuery key lookup behavior changed: - No key lookups when queuing commands in MULTI, only cross-slot validation. - Key lookups happen at EXEC time. - SELECT inside MULTI/EXEC is now checked, ensuring key validation uses the selected DB at lookup. ### Valkey-cli: - valkey-cli has been updated to support resharding across all databases. ### Configuration: - Introduce new configuration `cluster-databases`. The new configuration controls the maximal number of databases in cluster mode. Implements #1319 --------- Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> Signed-off-by: zhaozhao.zz <zhaozhao.zz@alibaba-inc.com> Co-authored-by: zhaozhao.zz <zhaozhao.zz@alibaba-inc.com> Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
1 parent 8dca801 commit 2fe08f8

15 files changed

+799
-119
lines changed

src/cluster.c

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -817,8 +817,16 @@ static int shouldReturnTlsInfo(void) {
817817
}
818818
}
819819

820+
unsigned int countKeysInSlotForDb(unsigned int hashslot, serverDb *db) {
821+
return kvstoreHashtableSize(db->keys, hashslot);
822+
}
823+
820824
unsigned int countKeysInSlot(unsigned int slot) {
821-
return kvstoreHashtableSize(server.db->keys, slot);
825+
unsigned int result = 0;
826+
for (int i = 0; i < server.dbnum; i++) {
827+
result += countKeysInSlotForDb(slot, server.db + i);
828+
}
829+
return result;
822830
}
823831

824832
void clusterCommandHelp(client *c) {
@@ -900,7 +908,7 @@ void clusterCommand(client *c) {
900908
addReplyError(c, "Invalid slot");
901909
return;
902910
}
903-
addReplyLongLong(c, countKeysInSlot(slot));
911+
addReplyLongLong(c, countKeysInSlotForDb(slot, c->db));
904912
} else if (!strcasecmp(c->argv[1]->ptr, "getkeysinslot") && c->argc == 4) {
905913
/* CLUSTER GETKEYSINSLOT <slot> <count> */
906914
long long maxkeys, slot;
@@ -912,11 +920,11 @@ void clusterCommand(client *c) {
912920
return;
913921
}
914922

915-
unsigned int keys_in_slot = countKeysInSlot(slot);
923+
unsigned int keys_in_slot = countKeysInSlotForDb(slot, c->db);
916924
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
917925
addReplyArrayLen(c, numkeys);
918926
kvstoreHashtableIterator *kvs_di = NULL;
919-
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot, 0);
927+
kvs_di = kvstoreGetHashtableIterator(c->db->keys, slot, 0);
920928
for (unsigned int i = 0; i < numkeys; i++) {
921929
void *next;
922930
serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next));
@@ -1031,6 +1039,8 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
10311039
int pubsubshard_included =
10321040
(cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_PUBSUB));
10331041

1042+
serverDb *currentDb = c->db;
1043+
10341044
/* Check that all the keys are in the same hash slot, and obtain this
10351045
* slot and the node associated. */
10361046
for (i = 0; i < ms->count; i++) {
@@ -1048,6 +1058,16 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
10481058
numkeys = getKeysFromCommand(mcmd, margv, margc, &result);
10491059
keyindex = result.keys;
10501060

1061+
if (mcmd->proc == selectCommand) {
1062+
/* Failed SELECT is ignored since it doesn't modify the database. */
1063+
serverDb *origDb = currentDb;
1064+
long long id;
1065+
if (getLongLongFromObject(margv[1], &id) == C_OK && selectDb(c, id) == C_OK) {
1066+
currentDb = c->db;
1067+
selectDb(c, origDb->id);
1068+
}
1069+
}
1070+
10511071
for (j = 0; j < numkeys; j++) {
10521072
robj *thiskey = margv[keyindex[j].pos];
10531073
int thisslot = keyHashSlot((char *)thiskey->ptr, sdslen(thiskey->ptr));
@@ -1081,6 +1101,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
10811101
importing_slot = 1;
10821102
}
10831103
}
1104+
10841105
} else {
10851106
/* If it is not the first key/channel, make sure it is exactly
10861107
* the same key/channel as the first we saw. */
@@ -1097,15 +1118,39 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
10971118
}
10981119
}
10991120

1100-
/* Migrating / Importing slot? Count keys we don't have.
1121+
/* Block MOVE command as the destination key is not expected to exist, and we don't know if it was migrated */
1122+
if ((migrating_slot || importing_slot) && mcmd->proc == moveCommand) {
1123+
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
1124+
getKeysFreeResult(&result);
1125+
return NULL;
1126+
}
1127+
1128+
/* Block the COPY command if it's cross-DB to keep the code simple.
1129+
* Allowing cross-DB COPY is possible, but it would require looking up the second key in the target DB.
1130+
* The command should only be allowed if the key exists. We may revisit this decision in the future. */
1131+
if ((migrating_slot || importing_slot) &&
1132+
mcmd->proc == copyCommand &&
1133+
margc >= 4 && !strcasecmp(margv[3]->ptr, "db")) {
1134+
long long value;
1135+
if (getLongLongFromObject(margv[4], &value) != C_OK || value != currentDb->id) {
1136+
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
1137+
getKeysFreeResult(&result);
1138+
return NULL;
1139+
}
1140+
}
1141+
1142+
/* Migrating / Importing slot? During exec we count keys we don't have.
11011143
* If it is pubsubshard command, it isn't required to check
11021144
* the channel being present or not in the node during the
11031145
* slot migration, the channel will be served from the source
11041146
* node until the migration completes with CLUSTER SETSLOT <slot>
11051147
* NODE <node-id>. */
11061148
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
1107-
if ((migrating_slot || importing_slot) && !pubsubshard_included) {
1108-
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
1149+
if ((migrating_slot || importing_slot) &&
1150+
!pubsubshard_included &&
1151+
(!c->flag.multi || (c->flag.multi && cmd->proc == execCommand)) // Multi/Exec validation happens on exec
1152+
) {
1153+
if (lookupKeyReadWithFlags(currentDb, thiskey, flags) == NULL)
11091154
missing_keys++;
11101155
else
11111156
existing_keys++;

src/cluster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ int detectAndUpdateCachedNodeHealth(void);
113113
client *createCachedResponseClient(int resp);
114114
void deleteCachedResponseClient(client *recording_client);
115115
void clearCachedClusterSlotsResponse(void);
116+
unsigned int countKeysInSlotForDb(unsigned int hashslot, serverDb *db);
116117
unsigned int countKeysInSlot(unsigned int hashslot);
117118
int getSlotOrReply(client *c, robj *o);
118119

src/cluster_legacy.c

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5878,11 +5878,6 @@ int verifyClusterConfigWithData(void) {
58785878
* completely depend on the replication stream. */
58795879
if (nodeIsReplica(myself)) return C_OK;
58805880

5881-
/* Make sure we only have keys in DB0. */
5882-
for (j = 1; j < server.dbnum; j++) {
5883-
if (kvstoreSize(server.db[j].keys)) return C_ERR;
5884-
}
5885-
58865881
/* Check that all the slots we see populated memory have a corresponding
58875882
* entry in the cluster table. Otherwise fix the table. */
58885883
for (j = 0; j < CLUSTER_SLOTS; j++) {
@@ -6542,29 +6537,31 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
65426537
server.server_del_keys_in_slot = 1;
65436538
unsigned int j = 0;
65446539

6545-
kvstoreHashtableIterator *kvs_di = NULL;
6546-
void *next;
6547-
kvs_di = kvstoreGetHashtableIterator(server.db->keys, hashslot, HASHTABLE_ITER_SAFE);
6548-
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
6549-
robj *valkey = next;
6550-
enterExecutionUnit(1, 0);
6551-
sds sdskey = objectGetKey(valkey);
6552-
robj *key = createStringObject(sdskey, sdslen(sdskey));
6553-
dbDelete(&server.db[0], key);
6554-
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
6555-
signalModifiedKey(NULL, &server.db[0], key);
6556-
/* The keys are not actually logically deleted from the database, just moved to another node.
6557-
* The modules needs to know that these keys are no longer available locally, so just send the
6558-
* keyspace notification to the modules, but not to clients. */
6559-
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
6560-
exitExecutionUnit();
6561-
postExecutionUnitOperations();
6562-
decrRefCount(key);
6563-
j++;
6564-
server.dirty++;
6540+
for (int i = 0; i < server.dbnum; i++) {
6541+
kvstoreHashtableIterator *kvs_di = NULL;
6542+
void *next;
6543+
serverDb db = server.db[i];
6544+
kvs_di = kvstoreGetHashtableIterator(db.keys, hashslot, HASHTABLE_ITER_SAFE);
6545+
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
6546+
robj *valkey = next;
6547+
enterExecutionUnit(1, 0);
6548+
sds sdskey = objectGetKey(valkey);
6549+
robj *key = createStringObject(sdskey, sdslen(sdskey));
6550+
dbDelete(&db, key);
6551+
propagateDeletion(&db, key, server.lazyfree_lazy_server_del);
6552+
signalModifiedKey(NULL, &db, key);
6553+
/* The keys are not actually logically deleted from the database, just moved to another node.
6554+
* The modules needs to know that these keys are no longer available locally, so just send the
6555+
* keyspace notification to the modules, but not to clients. */
6556+
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
6557+
exitExecutionUnit();
6558+
postExecutionUnitOperations();
6559+
decrRefCount(key);
6560+
j++;
6561+
server.dirty++;
6562+
}
6563+
kvstoreReleaseHashtableIterator(kvs_di);
65656564
}
6566-
kvstoreReleaseHashtableIterator(kvs_di);
6567-
65686565
server.server_del_keys_in_slot = 0;
65696566
serverAssert(server.execution_nesting == 0);
65706567
return j;
@@ -7033,7 +7030,7 @@ int clusterCommandSpecial(client *c) {
70337030
}
70347031
} else if (!strcasecmp(c->argv[1]->ptr, "flushslots") && c->argc == 2) {
70357032
/* CLUSTER FLUSHSLOTS */
7036-
if (kvstoreSize(server.db[0].keys) != 0) {
7033+
if (!dbHasNoKeys()) {
70377034
addReplyError(c, "DB must be empty to perform CLUSTER FLUSHSLOTS.");
70387035
return 1;
70397036
}
@@ -7203,7 +7200,7 @@ int clusterCommandSpecial(client *c) {
72037200
/* If the instance is currently a primary, it should have no assigned
72047201
* slots nor keys to accept to replicate some other node.
72057202
* Replicas can switch to another primary without issues. */
7206-
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) {
7203+
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !dbHasNoKeys())) {
72077204
addReplyError(c, "To set a master the node must be empty and "
72087205
"without assigned slots.");
72097206
return 1;
@@ -7355,7 +7352,7 @@ int clusterCommandSpecial(client *c) {
73557352

73567353
/* Replicas can be reset while containing data, but not primary nodes
73577354
* that must be empty. */
7358-
if (clusterNodeIsPrimary(myself) && kvstoreSize(c->db->keys) != 0) {
7355+
if (clusterNodeIsPrimary(myself) && !dbHasNoKeys()) {
73597356
addReplyError(c, "CLUSTER RESET can't be called with "
73607357
"master nodes containing keys");
73617358
return 1;

src/config.c

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -608,13 +608,6 @@ void loadServerConfigFromString(sds config) {
608608
goto loaderr;
609609
}
610610

611-
/* in case cluster mode is enabled dbnum must be 1 */
612-
if (server.cluster_enabled && server.dbnum > 1) {
613-
serverLog(LL_WARNING, "WARNING: Changing databases number from %d to 1 since we are in cluster mode",
614-
server.dbnum);
615-
server.dbnum = 1;
616-
}
617-
618611
/* To ensure backward compatibility and work while hz is out of range */
619612
if (server.hz < CONFIG_MIN_HZ) server.hz = CONFIG_MIN_HZ;
620613
if (server.hz > CONFIG_MAX_HZ) server.hz = CONFIG_MAX_HZ;
@@ -3263,7 +3256,8 @@ standardConfig static_configs[] = {
32633256
createEnumConfig("rdb-version-check", NULL, MODIFIABLE_CONFIG, rdb_version_check_enum, server.rdb_version_check, RDB_VERSION_CHECK_STRICT, NULL, NULL),
32643257

32653258
/* Integer configs */
3266-
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
3259+
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.config_databases, 16, INTEGER_CONFIG, NULL, NULL),
3260+
createIntConfig("cluster-databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.config_databases_cluster, 1, INTEGER_CONFIG, NULL, NULL),
32673261
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
32683262
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
32693263
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),

src/db.c

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -860,10 +860,6 @@ void selectCommand(client *c) {
860860

861861
if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK) return;
862862

863-
if (server.cluster_enabled && id != 0) {
864-
addReplyError(c, "SELECT is not allowed in cluster mode");
865-
return;
866-
}
867863
if (selectDb(c, id) == C_ERR) {
868864
addReplyError(c, "DB index is out of range");
869865
} else {
@@ -1429,11 +1425,6 @@ void moveCommand(client *c) {
14291425
int srcid, dbid;
14301426
long long expire;
14311427

1432-
if (server.cluster_enabled) {
1433-
addReplyError(c, "MOVE is not allowed in cluster mode");
1434-
return;
1435-
}
1436-
14371428
/* Obtain source and target DB pointers */
14381429
src = c->db;
14391430
srcid = c->db->id;
@@ -1518,11 +1509,6 @@ void copyCommand(client *c) {
15181509
}
15191510
}
15201511

1521-
if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) {
1522-
addReplyError(c, "Copying to another database is not allowed in cluster mode");
1523-
return;
1524-
}
1525-
15261512
/* If the user select the same DB as
15271513
* the source DB and using newkey as the same key
15281514
* it is probably an error. */
@@ -2851,3 +2837,12 @@ int bitfieldGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysRes
28512837
}
28522838
return 1;
28532839
}
2840+
2841+
bool dbHasNoKeys(void) {
2842+
for (int i = 0; i < server.dbnum; i++) {
2843+
if (kvstoreSize(server.db[i].keys) != 0) {
2844+
return false;
2845+
}
2846+
}
2847+
return true;
2848+
}

src/server.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2813,6 +2813,8 @@ void initServer(void) {
28132813
serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", strerror(errno));
28142814
exit(1);
28152815
}
2816+
2817+
server.dbnum = server.cluster_enabled ? server.config_databases_cluster : server.config_databases;
28162818
server.db = zmalloc(sizeof(serverDb) * server.dbnum);
28172819

28182820
/* Create the databases, and initialize other internal state. */

src/server.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include <limits.h>
4747
#include <unistd.h>
4848
#include <errno.h>
49+
#include <stdbool.h>
4950
#include <inttypes.h>
5051
#include <pthread.h>
5152
#include <syslog.h>
@@ -1793,7 +1794,9 @@ struct valkeyServer {
17931794
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from
17941795
within the main dict scan */
17951796
size_t client_max_querybuf_len; /* Limit for client query buffer length */
1796-
int dbnum; /* Total number of configured DBs */
1797+
int config_databases; /* Total number of configured DBs in standalone */
1798+
int config_databases_cluster; /* Total number of configured DBs in cluster mode */
1799+
int dbnum; /* Total number of initialized DBs */
17971800
int supervised; /* 1 if supervised, 0 otherwise. */
17981801
int supervised_mode; /* See SUPERVISED_* */
17991802
int daemonize; /* True if running as a daemon */
@@ -3500,6 +3503,7 @@ int zmpopGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResult
35003503
int bzmpopGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResult *result);
35013504
int setGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResult *result);
35023505
int bitfieldGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResult *result);
3506+
bool dbHasNoKeys(void);
35033507

35043508
unsigned short crc16(const char *buf, int len);
35053509

src/valkey-benchmark.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
717717
* buffer with the SELECT command, that will be discarded the first
718718
* time the replies are received, so if the client is reused the
719719
* SELECT command will not be used again. */
720-
if (config.conn_info.input_dbnum != 0 && !is_cluster_client) {
720+
if (config.conn_info.input_dbnum) {
721721
c->obuf = sdscatprintf(c->obuf, "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", (int)sdslen(config.input_dbnumstr),
722722
config.input_dbnumstr);
723723
c->prefix_pending++;

0 commit comments

Comments
 (0)