Skip to content

Commit 0862f9f

Browse files
authored
Support MPTCP for valkey-cli and benchmark (#2067)
Add '--mptcp' for both valkey-cli and valkey-benchmark to enable MPTCP. Signed-off-by: zhenwei pi <zhenwei.pi@linux.dev> Signed-off-by: zhenwei pi <pizhenwei@bytedance.com>
1 parent 70106f0 commit 0862f9f

File tree

4 files changed

+31
-8
lines changed

4 files changed

+31
-8
lines changed

src/cli_common.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ sds cliVersion(void) {
430430
}
431431

432432
/* This is a wrapper to call valkeyConnect or valkeyConnectWithTimeout. */
433-
valkeyContext *valkeyConnectWrapper(enum valkeyConnectionType ct, const char *ip_or_path, int port, const struct timeval tv, int nonblock) {
433+
valkeyContext *valkeyConnectWrapper(enum valkeyConnectionType ct, const char *ip_or_path, int port, const struct timeval tv, int nonblock, int multipath) {
434434
valkeyOptions options = {0};
435435

436436
switch (ct) {
@@ -462,5 +462,10 @@ valkeyContext *valkeyConnectWrapper(enum valkeyConnectionType ct, const char *ip
462462
options.options |= VALKEY_OPT_NONBLOCK;
463463
}
464464

465+
if (multipath) {
466+
assert(ct == VALKEY_CONN_TCP); /* caller should make sure multipath is available for TCP only */
467+
options.options |= VALKEY_OPT_MPTCP;
468+
}
469+
465470
return valkeyConnectWithOptions(&options);
466471
}

src/cli_common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@ sds escapeJsonString(sds s, const char *p, size_t len);
5353

5454
sds cliVersion(void);
5555

56-
valkeyContext *valkeyConnectWrapper(enum valkeyConnectionType ct, const char *ip_or_path, int port, const struct timeval tv, int nonblock);
56+
valkeyContext *valkeyConnectWrapper(enum valkeyConnectionType ct, const char *ip_or_path, int port, const struct timeval tv, int nonblock, int multipath);
5757

5858
#endif /* __CLICOMMON_H */

src/valkey-benchmark.c

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ static struct config {
9191
enum valkeyConnectionType ct;
9292
cliConnInfo conn_info;
9393
int tls;
94+
int mptcp;
9495
struct cliSSLconfig sslconfig;
9596
int numclients;
9697
_Atomic int liveclients;
@@ -248,7 +249,7 @@ static valkeyContext *getValkeyContext(enum valkeyConnectionType ct, const char
248249
valkeyContext *ctx = NULL;
249250
valkeyReply *reply = NULL;
250251
struct timeval tv = {0};
251-
ctx = valkeyConnectWrapper(ct, ip_or_path, port, tv, 0);
252+
ctx = valkeyConnectWrapper(ct, ip_or_path, port, tv, 0, config.mptcp);
252253
if (ctx == NULL || ctx->err) {
253254
fprintf(stderr, "Could not connect to server at ");
254255
char *err = (ctx != NULL ? ctx->errstr : "");
@@ -666,7 +667,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
666667
c->cluster_node = node;
667668
}
668669

669-
c->context = valkeyConnectWrapper(config.ct, ip, port, tv, 1);
670+
c->context = valkeyConnectWrapper(config.ct, ip, port, tv, 1, config.mptcp);
670671
if (c->context->err) {
671672
fprintf(stderr, "Could not connect to server at ");
672673
if (config.ct != VALKEY_CONN_UNIX || is_cluster_client)
@@ -1490,6 +1491,8 @@ int parseOptions(int argc, char **argv) {
14901491
}
14911492
config.ct = VALKEY_CONN_RDMA;
14921493
#endif
1494+
} else if (!strcmp(argv[i], "--mptcp")) {
1495+
config.mptcp = 1;
14931496
} else {
14941497
/* Assume the user meant to provide an option when the arg starts
14951498
* with a dash. We're done otherwise and should use the remainder
@@ -1602,6 +1605,7 @@ int parseOptions(int argc, char **argv) {
16021605
" the 'fcall' test. (default 1)\n",
16031606
tls_usage,
16041607
rdma_usage,
1608+
" --mptcp Enable an MPTCP connection.\n"
16051609
" --help Output this help and exit.\n"
16061610
" --version Output version and exit.\n\n"
16071611
"Examples:\n\n"
@@ -1780,6 +1784,11 @@ int main(int argc, char **argv) {
17801784
}
17811785
#endif
17821786

1787+
if (config.mptcp && (config.ct != VALKEY_CONN_TCP)) {
1788+
fprintf(stderr, "Options --mptcp is only supported by TCP\n");
1789+
exit(1);
1790+
}
1791+
17831792
if (config.cluster_mode) {
17841793
// We only include the slot placeholder {tag} if cluster mode is enabled
17851794
tag = ":{tag}";

src/valkey-cli.c

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ static struct config {
219219
cliConnInfo conn_info; /* conn_info.hostip is used as unix socket path on ct == VALKEY_CONN_UNIX */
220220
struct timeval connect_timeout;
221221
int tls;
222+
int mptcp;
222223
cliSSLconfig sslconfig;
223224
long repeat;
224225
long interval;
@@ -1639,7 +1640,7 @@ static int cliConnect(int flags) {
16391640
cliRefreshPrompt();
16401641
}
16411642

1642-
context = valkeyConnectWrapper(config.ct, config.conn_info.hostip, config.conn_info.hostport, config.connect_timeout, 0);
1643+
context = valkeyConnectWrapper(config.ct, config.conn_info.hostip, config.conn_info.hostport, config.connect_timeout, 0, config.mptcp);
16431644

16441645
if (!context->err && config.tls) {
16451646
const char *err = NULL;
@@ -2526,7 +2527,7 @@ static valkeyReply *reconnectingValkeyCommand(valkeyContext *c, const char *fmt,
25262527
fflush(stdout);
25272528

25282529
valkeyFree(c);
2529-
c = valkeyConnectWrapper(config.ct, config.conn_info.hostip, config.conn_info.hostport, config.connect_timeout, 0);
2530+
c = valkeyConnectWrapper(config.ct, config.conn_info.hostip, config.conn_info.hostport, config.connect_timeout, 0, config.mptcp);
25302531
if (!c->err && config.tls) {
25312532
const char *err = NULL;
25322533
if (cliSecureConnection(c, config.sslconfig, &err) == VALKEY_ERR && err) {
@@ -2831,6 +2832,8 @@ static int parseOptions(int argc, char **argv) {
28312832
}
28322833
config.ct = VALKEY_CONN_RDMA;
28332834
#endif
2835+
} else if (!strcmp(argv[i], "--mptcp")) {
2836+
config.mptcp = 1;
28342837
} else if (!strcmp(argv[i], "-v") || !strcmp(argv[i], "--version")) {
28352838
sds version = cliVersion();
28362839
printf("valkey-cli %s\n", version);
@@ -2910,6 +2913,11 @@ static int parseOptions(int argc, char **argv) {
29102913
exit(1);
29112914
}
29122915

2916+
if (config.mptcp && (config.ct != VALKEY_CONN_TCP)) {
2917+
fprintf(stderr, "Options --mptcp is only supported by TCP.\n");
2918+
exit(1);
2919+
}
2920+
29132921
return i;
29142922
}
29152923

@@ -3001,6 +3009,7 @@ static void usage(int err) {
30013009
" -6 Prefer IPv6 over IPv4 on DNS lookup.\n"
30023010
"%s"
30033011
"%s"
3012+
" --mptcp Enable an MPTCP connection.\n"
30043013
" --raw Use raw formatting for replies (default when STDOUT is\n"
30053014
" not a tty).\n"
30063015
" --no-raw Force formatted output even when STDOUT is not a tty.\n"
@@ -3985,7 +3994,7 @@ static int clusterManagerExecTransaction(clusterManagerNode *node, clusterManage
39853994

39863995
static int clusterManagerNodeConnect(clusterManagerNode *node) {
39873996
if (node->context) valkeyFree(node->context);
3988-
node->context = valkeyConnectWrapper(config.ct, node->ip, node->port, config.connect_timeout, 0);
3997+
node->context = valkeyConnectWrapper(config.ct, node->ip, node->port, config.connect_timeout, 0, config.mptcp);
39893998
if (!node->context->err && config.tls) {
39903999
const char *err = NULL;
39914000
if (cliSecureConnection(node->context, config.sslconfig, &err) == VALKEY_ERR && err) {
@@ -7727,7 +7736,7 @@ static int clusterManagerCommandImport(int argc, char **argv) {
77277736
char *reply_err = NULL;
77287737
valkeyReply *src_reply = NULL;
77297738
// Connect to the source node.
7730-
valkeyContext *src_ctx = valkeyConnectWrapper(config.ct, src_ip, src_port, config.connect_timeout, 0);
7739+
valkeyContext *src_ctx = valkeyConnectWrapper(config.ct, src_ip, src_port, config.connect_timeout, 0, config.mptcp);
77317740
if (src_ctx->err) {
77327741
success = 0;
77337742
fprintf(stderr, "Could not connect to Valkey at %s:%d: %s.\n", src_ip, src_port, src_ctx->errstr);

0 commit comments

Comments
 (0)