Skip to content

Commit c9b8bdc

Browse files
author
Ganeshwara Hananda
authored
Supports Grakn Cluster failover functionality (#232)
## What is the goal of this PR? We have added support for Grakn Cluster failover, allowing the client to switch between talking with one server to another. With this functionality, the users can now benefit from Cluster's high-availability and horizontal scalability. Instantiating a client for Grakn Cluster is similar to instantiating a client for Grakn Core. When it connects to the server, it will retrieve complete information about the whole cluster and other servers that belong there. ``` GraknClient.Cluster client = new GraknClient.Cluster("172.0.0.1:1729"); ``` When connected to Grakn Cluster, the client and session instances are bound to the whole cluster rather than a particular server. That means that if one server in the cluster crashes, the client and session instances are still fully operational and will simply failover to other servers that are available. Transaction instances however, are bound to the server they connect to and cannot failover to other servers midway. Therefore if a server crashes, the transactions will simply fail. Apart from the failover ability, the client behaviour when connecting to Grakn Cluster is otherwise identical to how it behaves when connecting to Grakn Core. ## What are the changes implemented in this PR? Grakn Cluster specific implementations of the following component: - `GraknClient.Cluster`: a client implementation that is designed to work with a cluster instance. It contains an array of `GraknClient.Core`, one for each server in the cluster - `RPCSession.Cluster`: a session implementation that is designed to work with a cluster instance. It contains an array of `RPCSession.Core`, one for each server in the cluster. - `RPCDatabaseManager.Cluster`: a database manager implementation for managing databases clusters
1 parent c026607 commit c9b8bdc

File tree

14 files changed

+598
-135
lines changed

14 files changed

+598
-135
lines changed

.grabl/automation.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ build:
5252
bazel run @graknlabs_dependencies//distribution/artifact:create-netrc
5353
bazel build //...
5454
bazel run @graknlabs_dependencies//tool/checkstyle:test-coverage
55-
bazel test $(bazel query 'kind(checkstyle_test, //...)')
55+
bazel test $(bazel query 'kind(checkstyle_test, //...)') --test_output=errors
5656
build-dependency:
5757
image: graknlabs-ubuntu-20.04
5858
command: |

BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ java_library(
5151
"@graknlabs_graql//java/pattern",
5252
"@graknlabs_graql//java/query",
5353
"@graknlabs_protocol//grpc/java:protocol",
54+
"@graknlabs_protocol//grpc/java:protocol-cluster",
5455

5556
"@graknlabs_grabl_tracing//client",
5657

Grakn.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ interface Transaction extends AutoCloseable {
114114

115115
enum Type {
116116
READ(0),
117+
READ_REPLICA(2),
117118
WRITE(1);
118119

119120
private final int id;

GraknClient.java

Lines changed: 143 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,63 +19,171 @@
1919

2020
package grakn.client;
2121

22-
import grakn.client.Grakn.Client;
23-
import grakn.client.Grakn.DatabaseManager;
24-
import grakn.client.Grakn.Session;
22+
import grakn.client.common.exception.GraknClientException;
23+
import grakn.client.rpc.Address;
2524
import grakn.client.rpc.RPCDatabaseManager;
2625
import grakn.client.rpc.RPCSession;
26+
import grakn.common.collection.Pair;
27+
import grakn.protocol.cluster.ClusterProto;
28+
import grakn.protocol.cluster.GraknClusterGrpc;
2729
import io.grpc.Channel;
2830
import io.grpc.ManagedChannel;
2931
import io.grpc.ManagedChannelBuilder;
32+
import io.grpc.StatusRuntimeException;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3035

36+
import java.util.Map;
37+
import java.util.Set;
3138
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.stream.Collectors;
3241

33-
public class GraknClient implements Client {
42+
import static grakn.client.common.exception.ErrorMessage.Client.CLUSTER_NOT_AVAILABLE;
43+
import static grakn.common.collection.Collections.pair;
3444

35-
public static final String DEFAULT_URI = "localhost:1729";
45+
public class GraknClient {
46+
public static final String DEFAULT_ADDRESS = "localhost:1729";
3647

37-
private final ManagedChannel channel;
38-
private final DatabaseManager databases;
39-
40-
public GraknClient() {
41-
this(DEFAULT_URI);
48+
public static GraknClient.Core core() {
49+
return core(DEFAULT_ADDRESS);
4250
}
4351

44-
public GraknClient(String address) {
45-
channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
46-
databases = new RPCDatabaseManager(channel);
52+
public static GraknClient.Core core(String address) {
53+
return new GraknClient.Core(address);
4754
}
4855

49-
@Override
50-
public Session session(String database, Session.Type type) {
51-
return session(database, type, new GraknOptions());
56+
public static GraknClient.Cluster cluster() {
57+
return new GraknClient.Cluster(DEFAULT_ADDRESS);
5258
}
5359

54-
@Override
55-
public Session session(String database, Session.Type type, GraknOptions options) {
56-
return new RPCSession(this, database, type, options);
60+
public static GraknClient.Cluster cluster(String address) {
61+
return new GraknClient.Cluster(address);
5762
}
5863

59-
@Override
60-
public DatabaseManager databases() {
61-
return databases;
62-
}
64+
public static class Core implements Grakn.Client {
65+
private final ManagedChannel channel;
66+
private final RPCDatabaseManager.Core databases;
6367

64-
@Override
65-
public boolean isOpen() {
66-
return !channel.isShutdown();
67-
}
68+
private Core(String address) {
69+
channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
70+
databases = new RPCDatabaseManager.Core(channel);
71+
}
6872

69-
@Override
70-
public void close() {
71-
try {
72-
channel.shutdown().awaitTermination(10, TimeUnit.SECONDS);
73-
} catch (InterruptedException e) {
74-
Thread.currentThread().interrupt();
73+
@Override
74+
public RPCSession.Core session(String database, Grakn.Session.Type type) {
75+
return session(database, type, new GraknOptions());
76+
}
77+
78+
@Override
79+
public RPCSession.Core session(String database, Grakn.Session.Type type, GraknOptions options) {
80+
return new RPCSession.Core(this, database, type, options);
81+
}
82+
83+
@Override
84+
public RPCDatabaseManager.Core databases() {
85+
return databases;
86+
}
87+
88+
@Override
89+
public boolean isOpen() {
90+
return !channel.isShutdown();
91+
}
92+
93+
@Override
94+
public void close() {
95+
try {
96+
channel.shutdown().awaitTermination(10, TimeUnit.SECONDS);
97+
} catch (InterruptedException e) {
98+
Thread.currentThread().interrupt();
99+
}
100+
}
101+
102+
public Channel channel() {
103+
return channel;
75104
}
76105
}
77106

78-
public Channel channel() {
79-
return channel;
107+
public static class Cluster implements Grakn.Client {
108+
private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
109+
private final Map<Address.Cluster.Server, Core> coreClientMap;
110+
private final Core[] coreClientArray;
111+
private final AtomicInteger selectedCoreClientIndex;
112+
private GraknClusterGrpc.GraknClusterBlockingStub clusterDiscoveryRPC;
113+
private final RPCDatabaseManager.Cluster databases;
114+
private boolean isOpen;
115+
116+
private Cluster(String address) {
117+
Pair<GraknClusterGrpc.GraknClusterBlockingStub, Set<Address.Cluster.Server>> discovery = discoverCluster(address);
118+
clusterDiscoveryRPC = discovery.first();
119+
coreClientMap = discovery.second().stream()
120+
.map(addr -> pair(addr, new Core(addr.client())))
121+
.collect(Collectors.toMap(Pair::first, Pair::second));
122+
coreClientArray = coreClientMap.values().toArray(new Core[] {});
123+
selectedCoreClientIndex = new AtomicInteger();
124+
databases = new RPCDatabaseManager.Cluster(
125+
coreClientMap.entrySet().stream()
126+
.map(client -> pair(client.getKey(), client.getValue().databases()))
127+
.collect(Collectors.toMap(Pair::first, Pair::second))
128+
);
129+
isOpen = true;
130+
}
131+
132+
@Override
133+
public RPCSession.Cluster session(String database, Grakn.Session.Type type) {
134+
return session(database, type, new GraknOptions());
135+
}
136+
137+
@Override
138+
public RPCSession.Cluster session(String database, Grakn.Session.Type type, GraknOptions options) {
139+
return new RPCSession.Cluster(this, database, type, options, clusterDiscoveryRPC);
140+
}
141+
142+
public GraknClusterGrpc.GraknClusterBlockingStub selectNextClusterDiscoveryRPC() {
143+
Core selected = selectNextCoreClient();
144+
clusterDiscoveryRPC = GraknClusterGrpc.newBlockingStub(selected.channel());
145+
return clusterDiscoveryRPC;
146+
}
147+
148+
@Override
149+
public RPCDatabaseManager.Cluster databases() {
150+
return databases;
151+
}
152+
153+
@Override
154+
public boolean isOpen() {
155+
return isOpen;
156+
}
157+
158+
@Override
159+
public void close() {
160+
coreClientMap.values().forEach(GraknClient.Core::close);
161+
isOpen = false;
162+
}
163+
164+
public Map<Address.Cluster.Server, Core> coreClients() {
165+
return coreClientMap;
166+
}
167+
168+
private Pair<GraknClusterGrpc.GraknClusterBlockingStub, Set<Address.Cluster.Server>> discoverCluster(String... addresses) {
169+
for (String address: addresses) {
170+
try (GraknClient.Core client = new Core(address)) {
171+
LOG.info("Performing server discovery to {}...", address);
172+
GraknClusterGrpc.GraknClusterBlockingStub clusterDiscoveryRPC = GraknClusterGrpc.newBlockingStub(client.channel());
173+
ClusterProto.Cluster.Discover.Res res =
174+
clusterDiscoveryRPC.clusterDiscover(ClusterProto.Cluster.Discover.Req.newBuilder().build());
175+
Set<Address.Cluster.Server> servers = res.getServersList().stream().map(Address.Cluster.Server::parse).collect(Collectors.toSet());
176+
LOG.info("Discovered {}", servers);
177+
return pair(clusterDiscoveryRPC, servers);
178+
} catch (StatusRuntimeException e) {
179+
LOG.error("Server discovery to {} failed.", address);
180+
}
181+
}
182+
throw new GraknClientException(CLUSTER_NOT_AVAILABLE.message((Object) addresses)); // remove ambiguity by casting to Object
183+
}
184+
185+
private Core selectNextCoreClient() {
186+
return coreClientArray[selectedCoreClientIndex.getAndIncrement() % coreClientMap.size()];
187+
}
80188
}
81189
}

common/exception/ErrorMessage.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ public static class Client extends ErrorMessage {
3636
new Client(4, "The required field 'res' of type '%s' was not set.");
3737
public static final Client UNKNOWN_REQUEST_ID =
3838
new Client(5, "Received a response with unknown request id '%s'.");
39+
public static final Client ILLEGAL_ARGUMENT = new Client(6, "Illegal argument passed into the method: '%s'.");
40+
41+
public static final Client CLUSTER_LEADER_NOT_YET_ELECTED =
42+
new Client(7, "No leader has been elected for latest known term '%s'.");
43+
44+
public static final Client CLUSTER_NOT_AVAILABLE =
45+
new Client(8, "Attempted connecting to these servers, but none are available: '%s'.");
3946

4047
private static final String codePrefix = "CLI";
4148
private static final String messagePrefix = "Illegal Client State";

dependencies/graknlabs/repositories.bzl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ def graknlabs_dependencies():
5050
def graknlabs_protocol():
5151
git_repository(
5252
name = "graknlabs_protocol",
53-
remote = "https://github.com/graknlabs/protocol",
54-
tag = "2.0.0-alpha-5", # sync-marker: do not remove this comment, this is used for sync-dependencies by @graknlabs_protocol
53+
remote = "https://github.com/lolski/protocol",
54+
commit = "b2cdf7d49617886a5643e1c1701e975a8418596c", # sync-marker: do not remove this comment, this is used for sync-dependencies by @graknlabs_protocol
5555
)
5656

5757
def graknlabs_behaviour():

rpc/Address.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package grakn.client.rpc;
21+
22+
import java.util.Objects;
23+
24+
import static java.lang.Integer.parseInt;
25+
26+
public class Address {
27+
public static class Cluster {
28+
public static class Server {
29+
private final String host;
30+
private final int clientPort;
31+
private final int serverPort;
32+
33+
public static Server parse(String address) {
34+
String[] split = address.split(":");
35+
return new Server(split[0], parseInt(split[1]), parseInt(split[2]));
36+
}
37+
38+
public Server(String host, int clientPort, int serverPort) {
39+
this.host = host;
40+
this.clientPort = clientPort;
41+
this.serverPort = serverPort;
42+
}
43+
44+
public String host() {
45+
return host;
46+
}
47+
48+
public int clientPort() {
49+
return clientPort;
50+
}
51+
52+
public int serverPort() {
53+
return serverPort;
54+
}
55+
56+
public String server() {
57+
return host + ":" + serverPort;
58+
}
59+
60+
public String client() {
61+
return host + ":" + clientPort;
62+
}
63+
64+
@Override
65+
public boolean equals(Object o) {
66+
if (this == o) return true;
67+
if (o == null || getClass() != o.getClass()) return false;
68+
Server server = (Server) o;
69+
return clientPort == server.clientPort &&
70+
serverPort == server.serverPort &&
71+
Objects.equals(host, server.host);
72+
}
73+
74+
@Override
75+
public int hashCode() {
76+
return Objects.hash(host, clientPort, serverPort);
77+
}
78+
79+
@Override
80+
public String toString() {
81+
return host + ":" + clientPort + ":" + serverPort;
82+
}
83+
}
84+
}
85+
86+
}

0 commit comments

Comments
 (0)