Skip to content

Commit c9c49b4

Browse files
authored
Introduce MPTCP for replication (#1961)
Allow replicas to use MPTCP in the outgoing replication connection. A new yes/no config is introduced `repl-mptcp`, default `no`. For MPTCP to be used in replication, the primary needs to be configured with `mptcp yes` and the replica with `repl-mptcp yes`. Otherwise, the connection falls back to regular TCP. Follow-up of #1811. --------- Signed-off-by: zhenwei pi <pizhenwei@bytedance.com>
1 parent 249495a commit c9c49b4

12 files changed

+49
-25
lines changed

src/anet.c

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,25 @@ static int anetCreateSocket(char *err, int domain, int type, int protocol, int f
441441
return s;
442442
}
443443

444+
/* XXX: Until glibc 2.41, getaddrinfo with hints.ai_protocol of IPPROTO_MPTCP leads error.
445+
* Use hints.ai_protocol IPPROTO_IP (0) or IPPROTO_TCP (6) to resolve address and overwrite
446+
* it when MPTCP is enabled.
447+
* Ref: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/tools/testing/selftests/net/mptcp/mptcp_connect.c
448+
* https://sourceware.org/git/?p=glibc.git;a=commit;h=a8e9022e0f829d44a818c642fc85b3bfbd26a514
449+
*/
450+
static int anetTcpGetProtocol(int is_mptcp_enabled) {
451+
#ifdef IPPROTO_MPTCP
452+
return is_mptcp_enabled ? IPPROTO_MPTCP : IPPROTO_TCP;
453+
#else
454+
assert(!is_mptcp_enabled);
455+
return IPPROTO_TCP;
456+
#endif
457+
}
458+
444459
#define ANET_CONNECT_NONE 0
445460
#define ANET_CONNECT_NONBLOCK 1
446461
#define ANET_CONNECT_BE_BINDING 2 /* Best effort binding. */
462+
#define ANET_CONNECT_MPTCP 4
447463
static int anetTcpGenericConnect(char *err, const char *addr, int port, const char *source_addr, int flags) {
448464
int s = ANET_ERR, rv;
449465
char portstr[6]; /* strlen("65535") + 1; */
@@ -466,9 +482,10 @@ static int anetTcpGenericConnect(char *err, const char *addr, int port, const ch
466482
* Make sure connection-intensive things like the benchmark tool
467483
* will be able to close/open sockets a zillion of times.
468484
*/
485+
int ai_protocol = anetTcpGetProtocol(flags & ANET_CONNECT_MPTCP);
469486
int sockflags = ANET_SOCKET_CLOEXEC | ANET_SOCKET_REUSEADDR;
470487
if (flags & ANET_CONNECT_NONBLOCK) sockflags |= ANET_SOCKET_NONBLOCK;
471-
if ((s = anetCreateSocket(err, p->ai_family, p->ai_socktype, p->ai_protocol, sockflags)) == ANET_ERR) continue;
488+
if ((s = anetCreateSocket(err, p->ai_family, p->ai_socktype, ai_protocol, sockflags)) == ANET_ERR) continue;
472489
if (source_addr) {
473490
int bound = 0;
474491
/* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
@@ -528,8 +545,10 @@ int anetTcpNonBlockConnect(char *err, const char *addr, int port) {
528545
return anetTcpGenericConnect(err, addr, port, NULL, ANET_CONNECT_NONBLOCK);
529546
}
530547

531-
int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port, const char *source_addr) {
532-
return anetTcpGenericConnect(err, addr, port, source_addr, ANET_CONNECT_NONBLOCK | ANET_CONNECT_BE_BINDING);
548+
int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port, const char *source_addr, int mptcp) {
549+
int flags = ANET_CONNECT_NONBLOCK | ANET_CONNECT_BE_BINDING;
550+
if (mptcp) flags |= ANET_CONNECT_MPTCP;
551+
return anetTcpGenericConnect(err, addr, port, source_addr, flags);
533552
}
534553

535554
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog, mode_t perm, char *group) {
@@ -574,21 +593,6 @@ static int anetV6Only(char *err, int s) {
574593
return ANET_OK;
575594
}
576595

577-
/* XXX: Until glibc 2.41, getaddrinfo with hints.ai_protocol of IPPROTO_MPTCP leads error.
578-
* Use hints.ai_protocol IPPROTO_IP (0) or IPPROTO_TCP (6) to resolve address and overwrite
579-
* it when MPTCP is enabled.
580-
* Ref: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/tools/testing/selftests/net/mptcp/mptcp_connect.c
581-
* https://sourceware.org/git/?p=glibc.git;a=commit;h=a8e9022e0f829d44a818c642fc85b3bfbd26a514
582-
*/
583-
static int anetTcpGetProtocol(int is_mptcp_enabled) {
584-
#ifdef IPPROTO_MPTCP
585-
return is_mptcp_enabled ? IPPROTO_MPTCP : IPPROTO_TCP;
586-
#else
587-
assert(!is_mptcp_enabled);
588-
return IPPROTO_TCP;
589-
#endif
590-
}
591-
592596
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog, int mptcp) {
593597
int s = -1, rv;
594598
char _port[6]; /* strlen("65535") */

src/anet.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
#endif
5353

5454
int anetTcpNonBlockConnect(char *err, const char *addr, int port);
55-
int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port, const char *source_addr);
55+
int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port, const char *source_addr, int mptcp);
5656
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len, int flags);
5757
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int mptcp);
5858
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int mptcp);

src/cluster_legacy.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5315,7 +5315,7 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now) {
53155315
clusterLink *link = createClusterLink(node);
53165316
link->conn = connCreate(connTypeOfCluster());
53175317
connSetPrivateData(link->conn, link);
5318-
if (connConnect(link->conn, node->ip, node->cport, server.bind_source_addr, clusterLinkConnectHandler) ==
5318+
if (connConnect(link->conn, node->ip, node->cport, server.bind_source_addr, 0, clusterLinkConnectHandler) ==
53195319
C_ERR) {
53205320
/* We got a synchronous error from connect before
53215321
* clusterSendPing() had a chance to be called.

src/config.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3167,6 +3167,7 @@ standardConfig static_configs[] = {
31673167
createBoolConfig("lazyfree-lazy-user-del", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_del, 1, NULL, NULL),
31683168
createBoolConfig("lazyfree-lazy-user-flush", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_flush, 1, NULL, NULL),
31693169
createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL),
3170+
createBoolConfig("repl-mptcp", NULL, IMMUTABLE_CONFIG, server.repl_mptcp, 0, isValidMptcp, NULL),
31703171
createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 1, NULL, NULL),
31713172
createBoolConfig("dual-channel-replication-enabled", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.dual_channel_replication, 0, NULL, NULL),
31723173
createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.aof_rewrite_incremental_fsync, 1, NULL, NULL),

src/connection.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ typedef struct ConnectionType {
9494
const char *addr,
9595
int port,
9696
const char *source_addr,
97+
int multipath,
9798
ConnectionCallbackFunc connect_handler);
9899
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
99100
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
@@ -188,8 +189,9 @@ static inline int connConnect(connection *conn,
188189
const char *addr,
189190
int port,
190191
const char *src_addr,
192+
int multipath,
191193
ConnectionCallbackFunc connect_handler) {
192-
return conn->type->connect(conn, addr, port, src_addr, connect_handler);
194+
return conn->type->connect(conn, addr, port, src_addr, multipath, connect_handler);
193195
}
194196

195197
/* Blocking connect.

src/rdma.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1184,11 +1184,15 @@ static int connRdmaConnect(connection *conn,
11841184
const char *addr,
11851185
int port,
11861186
const char *src_addr,
1187+
int multipath,
11871188
ConnectionCallbackFunc connect_handler) {
11881189
rdma_connection *rdma_conn = (rdma_connection *)conn;
11891190
struct rdma_cm_id *cm_id;
11901191
RdmaContext *ctx;
11911192

1193+
/* RDMA does not support multipath, and there is no outgoing RDMA connection at the current stage */
1194+
assert(!multipath);
1195+
11921196
if (rdmaResolveAddr(rdma_conn, addr, port, src_addr) == C_ERR) {
11931197
return C_ERR;
11941198
}

src/replication.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3941,7 +3941,7 @@ void syncWithPrimary(connection *conn) {
39413941
/* Create RDB connection */
39423942
server.repl_rdb_transfer_s = connCreate(connTypeOfReplication());
39433943
if (connConnect(server.repl_rdb_transfer_s, server.primary_host, server.primary_port, server.bind_source_addr,
3944-
dualChannelFullSyncWithPrimary) == C_ERR) {
3944+
server.repl_mptcp, dualChannelFullSyncWithPrimary) == C_ERR) {
39453945
dualChannelServerLog(LL_WARNING, "Unable to connect to Primary: %s",
39463946
connGetLastError(server.repl_transfer_s));
39473947
connClose(server.repl_rdb_transfer_s);
@@ -3978,7 +3978,7 @@ void syncWithPrimary(connection *conn) {
39783978
int connectWithPrimary(void) {
39793979
server.repl_transfer_s = connCreate(connTypeOfReplication());
39803980
if (connConnect(server.repl_transfer_s, server.primary_host, server.primary_port, server.bind_source_addr,
3981-
syncWithPrimary) == C_ERR) {
3981+
server.repl_mptcp, syncWithPrimary) == C_ERR) {
39823982
serverLog(LL_WARNING, "Unable to connect to PRIMARY: %s", connGetLastError(server.repl_transfer_s));
39833983
connClose(server.repl_transfer_s);
39843984
server.repl_transfer_s = NULL;

src/server.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1979,6 +1979,7 @@ struct valkeyServer {
19791979
int repl_replica_ignore_maxmemory; /* If true replicas do not evict. */
19801980
time_t repl_down_since; /* Unix time at which link with primary went down */
19811981
int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
1982+
int repl_mptcp; /* Use Multipath TCP for replica on client side */
19821983
int replica_priority; /* Reported in INFO and used by Sentinel. */
19831984
int replica_announced; /* If true, replica is announced by Sentinel */
19841985
int replica_announce_port; /* Give the primary this listening port. */

src/socket.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,9 @@ static int connSocketConnect(connection *conn,
106106
const char *addr,
107107
int port,
108108
const char *src_addr,
109+
int multipath,
109110
ConnectionCallbackFunc connect_handler) {
110-
int fd = anetTcpNonBlockBestEffortBindConnect(NULL, addr, port, src_addr);
111+
int fd = anetTcpNonBlockBestEffortBindConnect(NULL, addr, port, src_addr, multipath);
111112
if (fd == -1) {
112113
conn->state = CONN_STATE_ERROR;
113114
conn->last_errno = errno;

src/tls.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,7 @@ static int connTLSConnect(connection *conn_,
880880
const char *addr,
881881
int port,
882882
const char *src_addr,
883+
int multipath,
883884
ConnectionCallbackFunc connect_handler) {
884885
tls_connection *conn = (tls_connection *)conn_;
885886
unsigned char addr_buf[sizeof(struct in6_addr)];
@@ -893,7 +894,7 @@ static int connTLSConnect(connection *conn_,
893894
}
894895

895896
/* Initiate Socket connection first */
896-
if (connectionTypeTcp()->connect(conn_, addr, port, src_addr, connect_handler) == C_ERR) return C_ERR;
897+
if (connectionTypeTcp()->connect(conn_, addr, port, src_addr, multipath, connect_handler) == C_ERR) return C_ERR;
897898

898899
/* Return now, once the socket is connected we'll initiate
899900
* TLS connection from the event handler.

tests/unit/introspection.tcl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,7 @@ start_server {tags {"introspection"}} {
921921
daemonize
922922
tcp-backlog
923923
mptcp
924+
repl-mptcp
924925
always-show-logo
925926
syslog-enabled
926927
cluster-enabled

valkey.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,15 @@ dual-channel-replication-enabled no
830830
# be a good idea.
831831
repl-disable-tcp-nodelay no
832832

833+
# Enables MPTCP for the replica's connection to the primary.
834+
#
835+
# An MPTCP connection is established between the primary and the replica if
836+
# the replica has set 'repl-mptcp yes' and the primary has set 'mptcp yes'.
837+
# Otherwise, it will automatically and implicitly fall back to a regular TCP
838+
# connection.
839+
#
840+
# repl-mptcp no
841+
833842
# Set the replication backlog size. The backlog is a buffer that accumulates
834843
# replica data when replicas are disconnected for some time, so that when a
835844
# replica wants to reconnect again, often a full resync is not needed, but a

0 commit comments

Comments
 (0)