diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java index 6f6adf24..b0d3c022 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java @@ -75,7 +75,6 @@ public enum InitMode { private Executor callExecutor = MoreExecutors.directExecutor(); private AuthRpcProvider authProvider = NopAuthProvider.INSTANCE; private long readTimeoutMillis = 0; - private long connectTimeoutMillis = 30_000; private long discoveryTimeoutMillis = 60_000; private boolean useDefaultGrpcResolver = false; private GrpcCompression compression = GrpcCompression.NO_COMPRESSION; @@ -147,8 +146,9 @@ public long getReadTimeoutMillis() { return readTimeoutMillis; } + @Deprecated public long getConnectTimeoutMillis() { - return connectTimeoutMillis; + return 10_000; } public long getDiscoveryTimeoutMillis() { @@ -296,15 +296,13 @@ public GrpcTransportBuilder withReadTimeout(long timeout, TimeUnit unit) { return this; } + @Deprecated public GrpcTransportBuilder withConnectTimeout(Duration timeout) { - this.connectTimeoutMillis = timeout.toMillis(); - Preconditions.checkArgument(connectTimeoutMillis > 0, "connectTimeoutMillis must be greater than 0"); return this; } + @Deprecated public GrpcTransportBuilder withConnectTimeout(long timeout, TimeUnit unit) { - this.connectTimeoutMillis = unit.toMillis(timeout); - Preconditions.checkArgument(connectTimeoutMillis > 0, "connectTimeoutMillis must be greater than 0"); return this; } diff --git a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java index 47b9b3b8..e89fe2a4 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java @@ -1,9 +1,6 @@ package tech.ydb.core.impl.pool; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import io.grpc.Channel; import io.grpc.ConnectivityState; @@ -14,25 +11,21 @@ /** * @author Nikolay Perfilov */ -public class GrpcChannel { +public final class GrpcChannel implements Runnable { + /* Channel shutdown waits for finish of active grpc calls, so there must be enough time to complete them all */ private static final long WAIT_FOR_CLOSING_MS = 5000; private static final Logger logger = LoggerFactory.getLogger(GrpcChannel.class); private final EndpointRecord endpoint; private final ManagedChannel channel; - private final long connectTimeoutMs; - private final ReadyWatcher readyWatcher; public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory) { try { logger.debug("Creating grpc channel with {}", endpoint); this.endpoint = endpoint; - this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort(), - endpoint.getAuthority()); - this.connectTimeoutMs = factory.getConnectTimeoutMs(); - this.readyWatcher = new ReadyWatcher(); - this.readyWatcher.checkState(); + this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort(), endpoint.getAuthority()); + checkState(); } catch (Throwable th) { throw new RuntimeException("cannot create channel", th); } @@ -43,7 +36,7 @@ public EndpointRecord getEndpoint() { } public Channel getReadyChannel() { - return readyWatcher.getReadyChannel(); + return channel; } public boolean isShutdown() { @@ -72,50 +65,14 @@ public boolean shutdown() { } } - private class ReadyWatcher implements Runnable { - private final CompletableFuture future = new CompletableFuture<>(); - - public Channel getReadyChannel() { - try { - return future.get(connectTimeoutMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException ex) { - logger.warn("Grpc channel {} ready waiting is interrupted: ", endpoint, ex); - Thread.currentThread().interrupt(); - } catch (ExecutionException ex) { - logger.warn("Grpc channel {} connecting problem: ", endpoint, ex); - throw new RuntimeException("Channel " + endpoint + " connecting problem", ex); - } catch (TimeoutException ex) { - logger.warn("Grpc channel {} connect timeout exceeded", endpoint); - throw new RuntimeException("Channel " + endpoint + " connecting timeout"); - } - return null; - } - - public void checkState() { - ConnectivityState state = channel.getState(true); - logger.debug("Grpc channel {} new state: {}", endpoint, state); - switch (state) { - case READY: - future.complete(channel); - // keep tracking channel state - channel.notifyWhenStateChanged(state, this); - break; - case SHUTDOWN: - future.completeExceptionally(new IllegalStateException("Grpc channel already closed")); - break; - case TRANSIENT_FAILURE: - case CONNECTING: - case IDLE: - default: - // keep tracking channel state - channel.notifyWhenStateChanged(state, this); - break; - } - } + private void checkState() { + ConnectivityState state = channel.getState(true); + logger.debug("Grpc channel {} new state: {}", endpoint, state); + channel.notifyWhenStateChanged(state, this); + } - @Override - public void run() { - checkState(); - } + @Override + public void run() { + checkState(); } } diff --git a/core/src/main/java/tech/ydb/core/impl/pool/ManagedChannelFactory.java b/core/src/main/java/tech/ydb/core/impl/pool/ManagedChannelFactory.java index 77c8314d..4c63cb18 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/ManagedChannelFactory.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/ManagedChannelFactory.java @@ -14,6 +14,4 @@ interface Builder { } ManagedChannel newManagedChannel(String host, int port, String authority); - - long getConnectTimeoutMs(); } diff --git a/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java b/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java index 48a2e3f2..16e30ee4 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java @@ -36,7 +36,6 @@ public class NettyChannelFactory implements ManagedChannelFactory { private final boolean useTLS; private final byte[] cert; private final boolean retryEnabled; - private final long connectTimeoutMs; private final boolean useDefaultGrpcResolver; private final Long grpcKeepAliveTimeMillis; @@ -46,16 +45,10 @@ private NettyChannelFactory(GrpcTransportBuilder builder) { this.useTLS = builder.getUseTls(); this.cert = builder.getCert(); this.retryEnabled = builder.isEnableRetry(); - this.connectTimeoutMs = builder.getConnectTimeoutMillis(); this.useDefaultGrpcResolver = builder.useDefaultGrpcResolver(); this.grpcKeepAliveTimeMillis = builder.getGrpcKeepAliveTimeMillis(); } - @Override - public long getConnectTimeoutMs() { - return this.connectTimeoutMs; - } - @SuppressWarnings("deprecation") @Override public ManagedChannel newManagedChannel(String host, int port, String sslHostOverride) { diff --git a/core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java b/core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java index 6d777e36..5bfcaf96 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java @@ -36,7 +36,6 @@ public class ShadedNettyChannelFactory implements ManagedChannelFactory { private final boolean useTLS; private final byte[] cert; private final boolean retryEnabled; - private final long connectTimeoutMs; private final boolean useDefaultGrpcResolver; private final Long grpcKeepAliveTimeMillis; @@ -46,16 +45,10 @@ public ShadedNettyChannelFactory(GrpcTransportBuilder builder) { this.useTLS = builder.getUseTls(); this.cert = builder.getCert(); this.retryEnabled = builder.isEnableRetry(); - this.connectTimeoutMs = builder.getConnectTimeoutMillis(); this.useDefaultGrpcResolver = builder.useDefaultGrpcResolver(); this.grpcKeepAliveTimeMillis = builder.getGrpcKeepAliveTimeMillis(); } - @Override - public long getConnectTimeoutMs() { - return this.connectTimeoutMs; - } - @SuppressWarnings("deprecation") @Override public ManagedChannel newManagedChannel(String host, int port, String sslHostOverride) { diff --git a/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java b/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java index dd351f41..e36f17b1 100644 --- a/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java +++ b/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java @@ -5,8 +5,6 @@ import java.io.IOException; import java.nio.file.Files; import java.security.cert.CertificateException; -import java.time.Duration; -import java.util.concurrent.TimeUnit; import com.google.common.io.ByteStreams; import io.grpc.ClientInterceptor; @@ -79,7 +77,6 @@ public void defaultParams() { ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder); channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0)); - Assert.assertEquals(30_000l, factory.getConnectTimeoutMs()); Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null)); channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1)); @@ -100,13 +97,11 @@ public void defaultParams() { public void defaultSslFactory() { GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root") .withSecureConnection() - .withGrpcRetry(true) - .withConnectTimeout(Duration.ofMinutes(1)); + .withGrpcRetry(true); ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder); channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0)); - Assert.assertEquals(60000l, factory.getConnectTimeoutMs()); Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null)); channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1)); @@ -158,12 +153,10 @@ public void customSslFactory() throws CertificateException, IOException { GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root") .withSecureConnection(baos.toByteArray()) - .withGrpcRetry(false) - .withConnectTimeout(4, TimeUnit.SECONDS); + .withGrpcRetry(false); ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder); - Assert.assertEquals(4000l, factory.getConnectTimeoutMs()); Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null)); } finally { diff --git a/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelPoolTest.java b/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelPoolTest.java index 8167fbb4..c2dc06ee 100644 --- a/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelPoolTest.java +++ b/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelPoolTest.java @@ -22,7 +22,6 @@ public class GrpcChannelPoolTest { @Before public void setUp() { - Mockito.when(factoryMock.getConnectTimeoutMs()).thenReturn(500l); // timeout for ready watcher Mockito.when(factoryMock.newManagedChannel(Mockito.any(), Mockito.anyInt(), Mockito.isNull())) .then((args) -> ManagedChannelMock.good()); } diff --git a/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java b/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java deleted file mode 100644 index 4ed09275..00000000 --- a/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java +++ /dev/null @@ -1,92 +0,0 @@ -package tech.ydb.core.impl.pool; - - -import io.grpc.ConnectivityState; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import tech.ydb.core.grpc.GrpcTransportBuilder; - - -/** - * - * @author Aleksandr Gorshenin - */ -public class GrpcChannelTest { - private final ManagedChannelFactory factoryMock = Mockito.mock(ManagedChannelFactory.class); - private final GrpcTransportBuilder builderMock = Mockito.mock(GrpcTransportBuilder.class); - - @Before - public void setUp() { - Mockito.when(builderMock.getManagedChannelFactory()).thenReturn(factoryMock); - Mockito.when(factoryMock.getConnectTimeoutMs()).thenReturn(500l); // timeout for ready watcher - } - - @Test - public void goodChannels() { - Mockito.when(factoryMock.newManagedChannel(Mockito.any(), Mockito.anyInt(), Mockito.isNull())) - .thenReturn(ManagedChannelMock.good(), ManagedChannelMock.good()); - - EndpointRecord endpoint = new EndpointRecord("host1", 1234); - - GrpcChannel channel = new GrpcChannel(endpoint, factoryMock); - Assert.assertEquals(endpoint, channel.getEndpoint()); - Assert.assertNotNull(channel.getReadyChannel()); - channel.shutdown(); - channel.shutdown(); // double shutdown is ok - } - - @Test - public void slowChannels() { - ConnectivityState[] states = new ConnectivityState[] { - ConnectivityState.CONNECTING, - ConnectivityState.TRANSIENT_FAILURE, - ConnectivityState.IDLE, - ConnectivityState.CONNECTING, - ConnectivityState.TRANSIENT_FAILURE, - ConnectivityState.CONNECTING, - ConnectivityState.TRANSIENT_FAILURE, - ConnectivityState.CONNECTING, - ConnectivityState.READY, - }; - - Mockito.when(factoryMock.newManagedChannel(Mockito.any(), Mockito.anyInt(), Mockito.isNull())) - .thenReturn(new ManagedChannelMock(ConnectivityState.IDLE).nextStates(states)) - .thenReturn(new ManagedChannelMock(ConnectivityState.IDLE).nextStates(states)); - - EndpointRecord endpoint = new EndpointRecord("host1", 1234); - - GrpcChannel channel = new GrpcChannel(endpoint, factoryMock); - Assert.assertEquals(endpoint, channel.getEndpoint()); - Assert.assertNotNull(channel.getReadyChannel()); - channel.shutdown(); - } - - @Test - public void badChannels() { - ConnectivityState[] states = new ConnectivityState[] { - ConnectivityState.CONNECTING, - ConnectivityState.TRANSIENT_FAILURE, - ConnectivityState.CONNECTING, - ConnectivityState.TRANSIENT_FAILURE, - ConnectivityState.SHUTDOWN, - }; - - Mockito.when(factoryMock.newManagedChannel(Mockito.any(), Mockito.anyInt(), Mockito.isNull())) - .thenReturn(new ManagedChannelMock(ConnectivityState.IDLE).nextStates(states)) - .thenReturn(new ManagedChannelMock(ConnectivityState.IDLE).nextStates(states)); - - EndpointRecord endpoint = new EndpointRecord("host1", 1234); - - GrpcChannel channel = new GrpcChannel(endpoint, factoryMock); - Assert.assertEquals(endpoint, channel.getEndpoint()); - - RuntimeException ex1 = Assert.assertThrows(RuntimeException.class, channel::getReadyChannel); - Assert.assertEquals("Channel Endpoint{host=host1, port=1234, node=0, location=null, overrideAuthority=null} connecting problem", - ex1.getMessage()); - - channel.shutdown(); - } -} diff --git a/core/src/test/java/tech/ydb/core/impl/pool/ManagedChannelMock.java b/core/src/test/java/tech/ydb/core/impl/pool/ManagedChannelMock.java index 46b342ea..cc69628f 100644 --- a/core/src/test/java/tech/ydb/core/impl/pool/ManagedChannelMock.java +++ b/core/src/test/java/tech/ydb/core/impl/pool/ManagedChannelMock.java @@ -159,10 +159,5 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE public ManagedChannel newManagedChannel(String host, int port, String authority) { return good(); } - - @Override - public long getConnectTimeoutMs() { - return builder.getConnectTimeoutMillis(); - } }; }