Skip to content

Commit 39ad902

Browse files
committed
Removed waiting of the grpc channel ready
1 parent 7c3d0e4 commit 39ad902

File tree

9 files changed

+19
-185
lines changed

9 files changed

+19
-185
lines changed

core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ public enum InitMode {
7575
private Executor callExecutor = MoreExecutors.directExecutor();
7676
private AuthRpcProvider<? super GrpcAuthRpc> authProvider = NopAuthProvider.INSTANCE;
7777
private long readTimeoutMillis = 0;
78-
private long connectTimeoutMillis = 30_000;
7978
private long discoveryTimeoutMillis = 60_000;
8079
private boolean useDefaultGrpcResolver = false;
8180
private GrpcCompression compression = GrpcCompression.NO_COMPRESSION;
@@ -147,8 +146,9 @@ public long getReadTimeoutMillis() {
147146
return readTimeoutMillis;
148147
}
149148

149+
@Deprecated
150150
public long getConnectTimeoutMillis() {
151-
return connectTimeoutMillis;
151+
return 10_000;
152152
}
153153

154154
public long getDiscoveryTimeoutMillis() {
@@ -296,15 +296,13 @@ public GrpcTransportBuilder withReadTimeout(long timeout, TimeUnit unit) {
296296
return this;
297297
}
298298

299+
@Deprecated
299300
public GrpcTransportBuilder withConnectTimeout(Duration timeout) {
300-
this.connectTimeoutMillis = timeout.toMillis();
301-
Preconditions.checkArgument(connectTimeoutMillis > 0, "connectTimeoutMillis must be greater than 0");
302301
return this;
303302
}
304303

304+
@Deprecated
305305
public GrpcTransportBuilder withConnectTimeout(long timeout, TimeUnit unit) {
306-
this.connectTimeoutMillis = unit.toMillis(timeout);
307-
Preconditions.checkArgument(connectTimeoutMillis > 0, "connectTimeoutMillis must be greater than 0");
308306
return this;
309307
}
310308

Lines changed: 13 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package tech.ydb.core.impl.pool;
22

3-
import java.util.concurrent.CompletableFuture;
4-
import java.util.concurrent.ExecutionException;
53
import java.util.concurrent.TimeUnit;
6-
import java.util.concurrent.TimeoutException;
74

85
import io.grpc.Channel;
96
import io.grpc.ConnectivityState;
@@ -14,25 +11,21 @@
1411
/**
1512
* @author Nikolay Perfilov
1613
*/
17-
public class GrpcChannel {
14+
public final class GrpcChannel implements Runnable {
15+
1816
/* Channel shutdown waits for finish of active grpc calls, so there must be enough time to complete them all */
1917
private static final long WAIT_FOR_CLOSING_MS = 5000;
2018
private static final Logger logger = LoggerFactory.getLogger(GrpcChannel.class);
2119

2220
private final EndpointRecord endpoint;
2321
private final ManagedChannel channel;
24-
private final long connectTimeoutMs;
25-
private final ReadyWatcher readyWatcher;
2622

2723
public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory) {
2824
try {
2925
logger.debug("Creating grpc channel with {}", endpoint);
3026
this.endpoint = endpoint;
31-
this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort(),
32-
endpoint.getAuthority());
33-
this.connectTimeoutMs = factory.getConnectTimeoutMs();
34-
this.readyWatcher = new ReadyWatcher();
35-
this.readyWatcher.checkState();
27+
this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort(), endpoint.getAuthority());
28+
checkState();
3629
} catch (Throwable th) {
3730
throw new RuntimeException("cannot create channel", th);
3831
}
@@ -43,7 +36,7 @@ public EndpointRecord getEndpoint() {
4336
}
4437

4538
public Channel getReadyChannel() {
46-
return readyWatcher.getReadyChannel();
39+
return channel;
4740
}
4841

4942
public boolean isShutdown() {
@@ -72,50 +65,14 @@ public boolean shutdown() {
7265
}
7366
}
7467

75-
private class ReadyWatcher implements Runnable {
76-
private final CompletableFuture<ManagedChannel> future = new CompletableFuture<>();
77-
78-
public Channel getReadyChannel() {
79-
try {
80-
return future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
81-
} catch (InterruptedException ex) {
82-
logger.warn("Grpc channel {} ready waiting is interrupted: ", endpoint, ex);
83-
Thread.currentThread().interrupt();
84-
} catch (ExecutionException ex) {
85-
logger.warn("Grpc channel {} connecting problem: ", endpoint, ex);
86-
throw new RuntimeException("Channel " + endpoint + " connecting problem", ex);
87-
} catch (TimeoutException ex) {
88-
logger.warn("Grpc channel {} connect timeout exceeded", endpoint);
89-
throw new RuntimeException("Channel " + endpoint + " connecting timeout");
90-
}
91-
return null;
92-
}
93-
94-
public void checkState() {
95-
ConnectivityState state = channel.getState(true);
96-
logger.debug("Grpc channel {} new state: {}", endpoint, state);
97-
switch (state) {
98-
case READY:
99-
future.complete(channel);
100-
// keep tracking channel state
101-
channel.notifyWhenStateChanged(state, this);
102-
break;
103-
case SHUTDOWN:
104-
future.completeExceptionally(new IllegalStateException("Grpc channel already closed"));
105-
break;
106-
case TRANSIENT_FAILURE:
107-
case CONNECTING:
108-
case IDLE:
109-
default:
110-
// keep tracking channel state
111-
channel.notifyWhenStateChanged(state, this);
112-
break;
113-
}
114-
}
68+
private void checkState() {
69+
ConnectivityState state = channel.getState(true);
70+
logger.debug("Grpc channel {} new state: {}", endpoint, state);
71+
channel.notifyWhenStateChanged(state, this);
72+
}
11573

116-
@Override
117-
public void run() {
118-
checkState();
119-
}
74+
@Override
75+
public void run() {
76+
checkState();
12077
}
12178
}

core/src/main/java/tech/ydb/core/impl/pool/ManagedChannelFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,4 @@ interface Builder {
1414
}
1515

1616
ManagedChannel newManagedChannel(String host, int port, String authority);
17-
18-
long getConnectTimeoutMs();
1917
}

core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ public class NettyChannelFactory implements ManagedChannelFactory {
3636
private final boolean useTLS;
3737
private final byte[] cert;
3838
private final boolean retryEnabled;
39-
private final long connectTimeoutMs;
4039
private final boolean useDefaultGrpcResolver;
4140
private final Long grpcKeepAliveTimeMillis;
4241

@@ -46,16 +45,10 @@ private NettyChannelFactory(GrpcTransportBuilder builder) {
4645
this.useTLS = builder.getUseTls();
4746
this.cert = builder.getCert();
4847
this.retryEnabled = builder.isEnableRetry();
49-
this.connectTimeoutMs = builder.getConnectTimeoutMillis();
5048
this.useDefaultGrpcResolver = builder.useDefaultGrpcResolver();
5149
this.grpcKeepAliveTimeMillis = builder.getGrpcKeepAliveTimeMillis();
5250
}
5351

54-
@Override
55-
public long getConnectTimeoutMs() {
56-
return this.connectTimeoutMs;
57-
}
58-
5952
@SuppressWarnings("deprecation")
6053
@Override
6154
public ManagedChannel newManagedChannel(String host, int port, String sslHostOverride) {

core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ public class ShadedNettyChannelFactory implements ManagedChannelFactory {
3636
private final boolean useTLS;
3737
private final byte[] cert;
3838
private final boolean retryEnabled;
39-
private final long connectTimeoutMs;
4039
private final boolean useDefaultGrpcResolver;
4140
private final Long grpcKeepAliveTimeMillis;
4241

@@ -46,16 +45,10 @@ public ShadedNettyChannelFactory(GrpcTransportBuilder builder) {
4645
this.useTLS = builder.getUseTls();
4746
this.cert = builder.getCert();
4847
this.retryEnabled = builder.isEnableRetry();
49-
this.connectTimeoutMs = builder.getConnectTimeoutMillis();
5048
this.useDefaultGrpcResolver = builder.useDefaultGrpcResolver();
5149
this.grpcKeepAliveTimeMillis = builder.getGrpcKeepAliveTimeMillis();
5250
}
5351

54-
@Override
55-
public long getConnectTimeoutMs() {
56-
return this.connectTimeoutMs;
57-
}
58-
5952
@SuppressWarnings("deprecation")
6053
@Override
6154
public ManagedChannel newManagedChannel(String host, int port, String sslHostOverride) {

core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import java.io.IOException;
66
import java.nio.file.Files;
77
import java.security.cert.CertificateException;
8-
import java.time.Duration;
9-
import java.util.concurrent.TimeUnit;
108

119
import com.google.common.io.ByteStreams;
1210
import io.grpc.ClientInterceptor;
@@ -79,7 +77,6 @@ public void defaultParams() {
7977
ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder);
8078
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0));
8179

82-
Assert.assertEquals(30_000l, factory.getConnectTimeoutMs());
8380
Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null));
8481

8582
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1));
@@ -100,13 +97,11 @@ public void defaultParams() {
10097
public void defaultSslFactory() {
10198
GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root")
10299
.withSecureConnection()
103-
.withGrpcRetry(true)
104-
.withConnectTimeout(Duration.ofMinutes(1));
100+
.withGrpcRetry(true);
105101

106102
ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder);
107103
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0));
108104

109-
Assert.assertEquals(60000l, factory.getConnectTimeoutMs());
110105
Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null));
111106

112107
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1));
@@ -158,12 +153,10 @@ public void customSslFactory() throws CertificateException, IOException {
158153

159154
GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root")
160155
.withSecureConnection(baos.toByteArray())
161-
.withGrpcRetry(false)
162-
.withConnectTimeout(4, TimeUnit.SECONDS);
156+
.withGrpcRetry(false);
163157

164158
ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder);
165159

166-
Assert.assertEquals(4000l, factory.getConnectTimeoutMs());
167160
Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null));
168161

169162
} finally {

core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelPoolTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ public class GrpcChannelPoolTest {
2222

2323
@Before
2424
public void setUp() {
25-
Mockito.when(factoryMock.getConnectTimeoutMs()).thenReturn(500l); // timeout for ready watcher
2625
Mockito.when(factoryMock.newManagedChannel(Mockito.any(), Mockito.anyInt(), Mockito.isNull()))
2726
.then((args) -> ManagedChannelMock.good());
2827
}

core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java

Lines changed: 0 additions & 92 deletions
This file was deleted.

core/src/test/java/tech/ydb/core/impl/pool/ManagedChannelMock.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,5 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
159159
public ManagedChannel newManagedChannel(String host, int port, String authority) {
160160
return good();
161161
}
162-
163-
@Override
164-
public long getConnectTimeoutMs() {
165-
return builder.getConnectTimeoutMillis();
166-
}
167162
};
168163
}

0 commit comments

Comments
 (0)