Skip to content

Commit 41c18e4

Browse files
author
Ganeshwara Hananda
authored
Improve failover mechanism for primary and secondary replica connections (#243)
## What is the goal of this PR? We have increased resilience by improving the failover mechanism between replicas. ## What are the changes implemented in this PR? - Remove the terminology leader / non-leader which are Raft specific. We now use the terminology "replica" to refer to a single copy of a database. The active replica that can receive data is now called "primary replica" whereas the passive ones "secondary replica" - Split `GraknOptions` to `GraknOptions.core()` and `GraknOptions.cluster()`, the later of which contains an option to read from secondary replica - Increase resilience: - Database and cluster discovery will now be re-attempted to all cluster members instead of just to one of them - When the cluster have not decided which replica is the primary replica, the client will wait and retry instead of simply failing - Changed info-level log to debug
1 parent d2eb1ec commit 41c18e4

File tree

13 files changed

+258
-134
lines changed

13 files changed

+258
-134
lines changed

Grakn.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ interface Transaction extends AutoCloseable {
114114

115115
enum Type {
116116
READ(0),
117-
READ_REPLICA(2),
118117
WRITE(1);
119118

120119
private final int id;

GraknClient.java

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,21 @@
3636
import java.util.Map;
3737
import java.util.Set;
3838
import java.util.concurrent.TimeUnit;
39-
import java.util.concurrent.atomic.AtomicInteger;
4039
import java.util.stream.Collectors;
4140

42-
import static grakn.client.common.exception.ErrorMessage.Client.CLUSTER_NOT_AVAILABLE;
41+
import static grakn.client.common.exception.ErrorMessage.Client.CLUSTER_UNABLE_TO_CONNECT;
42+
import static grakn.client.common.exception.ErrorMessage.Client.ILLEGAL_ARGUMENT;
4343
import static grakn.common.collection.Collections.pair;
4444

4545
public class GraknClient {
4646
public static final String DEFAULT_ADDRESS = "localhost:1729";
4747

48-
public static GraknClient.Core core() {
48+
public static Core core() {
4949
return core(DEFAULT_ADDRESS);
5050
}
5151

52-
public static GraknClient.Core core(String address) {
53-
return new GraknClient.Core(address);
52+
public static Core core(String address) {
53+
return new Core(address);
5454
}
5555

5656
public static GraknClient.Cluster cluster() {
@@ -72,7 +72,7 @@ private Core(String address) {
7272

7373
@Override
7474
public RPCSession.Core session(String database, Grakn.Session.Type type) {
75-
return session(database, type, new GraknOptions());
75+
return session(database, type, GraknOptions.core());
7676
}
7777

7878
@Override
@@ -106,23 +106,20 @@ public Channel channel() {
106106

107107
public static class Cluster implements Grakn.Client {
108108
private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
109-
private final Map<Address.Cluster.Server, Core> coreClientMap;
110-
private final Core[] coreClientArray;
111-
private final AtomicInteger selectedCoreClientIndex;
112-
private GraknClusterGrpc.GraknClusterBlockingStub clusterDiscoveryRPC;
109+
private final Map<Address.Cluster.Server, Core> coreClients;
110+
private final Map<Address.Cluster.Server, GraknClusterGrpc.GraknClusterBlockingStub> graknClusterRPCs;
113111
private final RPCDatabaseManager.Cluster databases;
114112
private boolean isOpen;
115113

116114
private Cluster(String address) {
117-
Pair<GraknClusterGrpc.GraknClusterBlockingStub, Set<Address.Cluster.Server>> discovery = discoverCluster(address);
118-
clusterDiscoveryRPC = discovery.first();
119-
coreClientMap = discovery.second().stream()
115+
coreClients = discoverCluster(address).stream()
120116
.map(addr -> pair(addr, new Core(addr.client())))
121117
.collect(Collectors.toMap(Pair::first, Pair::second));
122-
coreClientArray = coreClientMap.values().toArray(new Core[] {});
123-
selectedCoreClientIndex = new AtomicInteger();
118+
graknClusterRPCs = coreClients.entrySet().stream()
119+
.map(client -> pair(client.getKey(), GraknClusterGrpc.newBlockingStub(client.getValue().channel())))
120+
.collect(Collectors.toMap(Pair::first, Pair::second));
124121
databases = new RPCDatabaseManager.Cluster(
125-
coreClientMap.entrySet().stream()
122+
coreClients.entrySet().stream()
126123
.map(client -> pair(client.getKey(), client.getValue().databases()))
127124
.collect(Collectors.toMap(Pair::first, Pair::second))
128125
);
@@ -131,18 +128,13 @@ private Cluster(String address) {
131128

132129
@Override
133130
public RPCSession.Cluster session(String database, Grakn.Session.Type type) {
134-
return session(database, type, new GraknOptions());
131+
return session(database, type, GraknOptions.cluster());
135132
}
136133

137134
@Override
138135
public RPCSession.Cluster session(String database, Grakn.Session.Type type, GraknOptions options) {
139-
return new RPCSession.Cluster(this, database, type, options, clusterDiscoveryRPC);
140-
}
141-
142-
public GraknClusterGrpc.GraknClusterBlockingStub selectNextClusterDiscoveryRPC() {
143-
Core selected = selectNextCoreClient();
144-
clusterDiscoveryRPC = GraknClusterGrpc.newBlockingStub(selected.channel());
145-
return clusterDiscoveryRPC;
136+
if (!options.isCluster()) throw new GraknClientException(ILLEGAL_ARGUMENT, options);
137+
return new RPCSession.Cluster(this, database, type, options.asCluster());
146138
}
147139

148140
@Override
@@ -157,33 +149,42 @@ public boolean isOpen() {
157149

158150
@Override
159151
public void close() {
160-
coreClientMap.values().forEach(GraknClient.Core::close);
152+
coreClients.values().forEach(GraknClient.Core::close);
161153
isOpen = false;
162154
}
163155

164-
public Map<Address.Cluster.Server, Core> coreClients() {
165-
return coreClientMap;
156+
public Set<Address.Cluster.Server> servers() {
157+
return coreClients.keySet();
158+
}
159+
160+
public Core coreClient(Address.Cluster.Server address) {
161+
return coreClients.get(address);
162+
}
163+
164+
public GraknClusterGrpc.GraknClusterBlockingStub graknClusterRPC(Address.Cluster.Server address) {
165+
return graknClusterRPCs.get(address);
166166
}
167167

168-
private Pair<GraknClusterGrpc.GraknClusterBlockingStub, Set<Address.Cluster.Server>> discoverCluster(String... addresses) {
168+
private Set<Address.Cluster.Server> discoverCluster(String... addresses) {
169169
for (String address: addresses) {
170-
try (GraknClient.Core client = new Core(address)) {
171-
LOG.info("Performing server discovery to {}...", address);
172-
GraknClusterGrpc.GraknClusterBlockingStub clusterDiscoveryRPC = GraknClusterGrpc.newBlockingStub(client.channel());
170+
try (Core client = new Core(address)) {
171+
LOG.debug("Performing server discovery to {}...", address);
172+
GraknClusterGrpc.GraknClusterBlockingStub graknClusterRPC = GraknClusterGrpc.newBlockingStub(client.channel());
173173
ClusterProto.Cluster.Discover.Res res =
174-
clusterDiscoveryRPC.clusterDiscover(ClusterProto.Cluster.Discover.Req.newBuilder().build());
174+
graknClusterRPC.clusterDiscover(ClusterProto.Cluster.Discover.Req.newBuilder().build());
175175
Set<Address.Cluster.Server> servers = res.getServersList().stream().map(Address.Cluster.Server::parse).collect(Collectors.toSet());
176-
LOG.info("Discovered {}", servers);
177-
return pair(clusterDiscoveryRPC, servers);
176+
LOG.debug("Discovered {}", servers);
177+
return servers;
178178
} catch (StatusRuntimeException e) {
179179
LOG.error("Server discovery to {} failed.", address);
180180
}
181181
}
182-
throw new GraknClientException(CLUSTER_NOT_AVAILABLE.message((Object) addresses)); // remove ambiguity by casting to Object
182+
throw clusterNotAvailableException();
183183
}
184184

185-
private Core selectNextCoreClient() {
186-
return coreClientArray[selectedCoreClientIndex.getAndIncrement() % coreClientMap.size()];
185+
private GraknClientException clusterNotAvailableException() {
186+
String addresses = servers().stream().map(Address.Cluster.Server::toString).collect(Collectors.joining(","));
187+
return new GraknClientException(CLUSTER_UNABLE_TO_CONNECT, addresses); // remove ambiguity by casting to Object
187188
}
188189
}
189190
}

GraknOptions.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import java.util.Optional;
2525

2626
import static grakn.client.common.exception.ErrorMessage.Client.NEGATIVE_BATCH_SIZE;
27+
import static grakn.client.common.exception.ErrorMessage.Internal.ILLEGAL_CAST;
2728

2829
public class GraknOptions {
29-
3030
private Boolean infer = null;
3131
private Boolean explain = null;
3232
private Integer batchSize = null;
@@ -60,4 +60,45 @@ public GraknOptions batchSize(int batchSize) {
6060
this.batchSize = batchSize;
6161
return this;
6262
}
63+
64+
public static GraknOptions core() {
65+
return new GraknOptions();
66+
}
67+
68+
public static GraknOptions.Cluster cluster() {
69+
return new Cluster();
70+
}
71+
72+
public boolean isCluster() {
73+
return false;
74+
}
75+
76+
public Cluster asCluster() {
77+
throw new GraknClientException(ILLEGAL_CAST, Cluster.class);
78+
}
79+
80+
public static class Cluster extends GraknOptions {
81+
private Boolean allowSecondaryReplica = null;
82+
83+
Cluster() {}
84+
85+
public Optional<Boolean> allowSecondaryReplica() {
86+
return Optional.ofNullable(allowSecondaryReplica);
87+
}
88+
89+
public Cluster allowSecondaryReplica(boolean primaryReplica) {
90+
this.allowSecondaryReplica = primaryReplica;
91+
return this;
92+
}
93+
94+
@Override
95+
public boolean isCluster() {
96+
return true;
97+
}
98+
99+
@Override
100+
public Cluster asCluster() {
101+
return this;
102+
}
103+
}
63104
}

GraknProtoBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ public static OptionsProto.Options options(GraknOptions options) {
2828
options.infer().ifPresent(builder::setInfer);
2929
options.explain().ifPresent(builder::setExplain);
3030
options.batchSize().ifPresent(builder::setBatchSize);
31+
32+
if (options.isCluster()) {
33+
options.asCluster().allowSecondaryReplica().ifPresent(builder::setAllowSecondaryReplica);
34+
}
35+
3136
return builder.build();
3237
}
3338
}

common/exception/ErrorMessage.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,15 @@ public static class Client extends ErrorMessage {
3535
public static final Client MISSING_RESPONSE =
3636
new Client(4, "The required field 'res' of type '%s' was not set.");
3737
public static final Client UNKNOWN_REQUEST_ID =
38-
new Client(5, "Received a response with unknown request id '%s'.");
38+
new Client(5, "Received a response with unknown request id '%s'.") ;
3939
public static final Client ILLEGAL_ARGUMENT = new Client(6, "Illegal argument passed into the method: '%s'.");
40-
41-
public static final Client CLUSTER_LEADER_NOT_YET_ELECTED =
42-
new Client(7, "No leader has been elected for latest known term '%s'.");
43-
44-
public static final Client CLUSTER_NOT_AVAILABLE =
45-
new Client(8, "Attempted connecting to these servers, but none are available: '%s'.");
46-
public static final Client UNABLE_TO_CONNECT =
47-
new Client(9, "Unable to connect to Grakn Core Server.");
40+
public static final Client UNABLE_TO_CONNECT = new Client(7, "Unable to connect to Grakn Core Server.");
41+
public static final Client CLUSTER_NO_PRIMARY_REPLICA_YET =
42+
new Client(8, "No replica has been marked as the primary replica for latest known term '%s'.");
43+
public static final Client CLUSTER_UNABLE_TO_CONNECT =
44+
new Client(9, "Unable to connect to Grakn Cluster. Attempted connecting to these servers, but none are available: '%s'.");
45+
public static final Client CLUSTER_REPLICA_NOT_PRIMARY =
46+
new Client(10, "The replica is not the primary replica");
4847

4948
private static final String codePrefix = "CLI";
5049
private static final String messagePrefix = "Illegal Client State";
@@ -87,8 +86,6 @@ public static class Query extends ErrorMessage {
8786
new Query(3, "The answer type '%s' was not recognised.");
8887
public static final Query MISSING_ANSWER =
8988
new Query(4, "The required field 'answer' of type '%s' was not set.");
90-
public static final Query ILLEGAL_CAST =
91-
new Query(5, "Illegal casting operation to '%s'.");
9289

9390
private static final String codePrefix = "QRY";
9491
private static final String messagePrefix = "Query Error";
@@ -97,4 +94,18 @@ public static class Query extends ErrorMessage {
9794
super(codePrefix, number, messagePrefix, message);
9895
}
9996
}
97+
98+
public static class Internal extends ErrorMessage {
99+
public static final Internal UNEXPECTED_INTERRUPTION =
100+
new Internal(1, "Unexpected thread interruption!");
101+
public static final Internal ILLEGAL_CAST =
102+
new Internal(2, "Illegal casting operation to '%s'.");
103+
104+
private static final String codePrefix = "INT";
105+
private static final String messagePrefix = "Internal Error";
106+
107+
Internal(int number, String message) {
108+
super(codePrefix, number, messagePrefix, message);
109+
}
110+
}
100111
}

common/exception/GraknClientException.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,17 @@ public GraknClientException(String error) {
3333
this.errorMessage = null;
3434
}
3535

36-
public GraknClientException(ErrorMessage error) {
37-
super(error.toString());
36+
public GraknClientException(ErrorMessage error, Object... parameters) {
37+
super(error.message(parameters));
3838
assert !getMessage().contains("%s");
3939
this.errorMessage = error;
4040
}
4141

4242
public static GraknClientException of(StatusRuntimeException statusRuntimeException) {
4343
if (statusRuntimeException.getStatus().getCode() == Status.Code.UNAVAILABLE) {
4444
return new GraknClientException(ErrorMessage.Client.UNABLE_TO_CONNECT);
45+
} else if (isReplicaNotPrimaryException(statusRuntimeException)) {
46+
return new GraknClientException(ErrorMessage.Client.CLUSTER_REPLICA_NOT_PRIMARY);
4547
}
4648
return new GraknClientException(statusRuntimeException.getStatus().getDescription());
4749
}
@@ -59,4 +61,9 @@ public String getName() {
5961
public ErrorMessage getErrorMessage() {
6062
return errorMessage;
6163
}
64+
65+
// TODO: propagate exception from the server side in a less-brittle way
66+
private static boolean isReplicaNotPrimaryException(StatusRuntimeException statusRuntimeException) {
67+
return statusRuntimeException.getStatus().getCode() == Status.Code.INTERNAL && statusRuntimeException.getStatus().getDescription().contains("[RPL01]");
68+
}
6269
}

common/tracing/TracingProtoBuilder.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
package grakn.client.common.tracing;
2121

2222
import grabl.tracing.client.GrablTracingThreadStatic;
23-
import grakn.client.GraknOptions;
24-
import grakn.protocol.OptionsProto;
2523

2624
import java.util.Collections;
2725
import java.util.HashMap;

concept/answer/Numeric.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import javax.annotation.Nullable;
2626

2727
import static grakn.client.common.exception.ErrorMessage.Query.BAD_ANSWER_TYPE;
28-
import static grakn.client.common.exception.ErrorMessage.Query.ILLEGAL_CAST;
28+
import static grakn.client.common.exception.ErrorMessage.Internal.ILLEGAL_CAST;
2929

3030
public class Numeric {
3131
@Nullable

dependencies/graknlabs/repositories.bzl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ def graknlabs_dependencies():
5050
def graknlabs_protocol():
5151
git_repository(
5252
name = "graknlabs_protocol",
53-
remote = "https://github.com/graknlabs/protocol",
54-
tag = "2.0.0-alpha-6", # sync-marker: do not remove this comment, this is used for sync-dependencies by @graknlabs_protocol
53+
remote = "https://github.com/lolski/protocol",
54+
commit = "6d5a5e1b58d91fd001e06b2820363e7194f7fd3f", # sync-marker: do not remove this comment, this is used for sync-dependencies by @graknlabs_protocol
5555
)
5656

5757
def graknlabs_behaviour():

0 commit comments

Comments
 (0)