Skip to content

Commit 54a7efd

Browse files
poorbarcodelhotari
authored andcommitted
[fix][client] Orphan producer when concurrently calling producer closing and reconnection (#23853)
(cherry picked from commit 56adefa)
1 parent 16da87f commit 54a7efd

File tree

5 files changed

+197
-52
lines changed

5 files changed

+197
-52
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import java.time.Duration;
2122
import lombok.Cleanup;
2223
import org.apache.commons.lang3.RandomStringUtils;
2324
import org.apache.pulsar.broker.service.Topic;
@@ -71,6 +72,46 @@ public Object[][] produceConf() {
7172
};
7273
}
7374

75+
/**
76+
* Param1: Producer enableBatch or not
77+
* Param2: Send in async way or not
78+
*/
79+
@DataProvider(name = "brokenPipeline")
80+
public Object[][] brokenPipeline() {
81+
return new Object[][]{
82+
{true},
83+
{false}
84+
};
85+
}
86+
87+
@Test(dataProvider = "brokenPipeline")
88+
public void testProducerCloseCallback2(boolean brokenPipeline) throws Exception {
89+
initClient();
90+
@Cleanup
91+
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
92+
.topic("testProducerClose")
93+
.sendTimeout(5, TimeUnit.SECONDS)
94+
.maxPendingMessages(0)
95+
.enableBatching(false)
96+
.create();
97+
final TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
98+
final TypedMessageBuilder<byte[]> value = messageBuilder.value("test-msg".getBytes(StandardCharsets.UTF_8));
99+
producer.getClientCnx().channel().config().setAutoRead(false);
100+
final CompletableFuture<MessageId> completableFuture = value.sendAsync();
101+
producer.closeAsync();
102+
Thread.sleep(3000);
103+
if (brokenPipeline) {
104+
//producer.getClientCnx().channel().config().setAutoRead(true);
105+
producer.getClientCnx().channel().close();
106+
} else {
107+
producer.getClientCnx().channel().config().setAutoRead(true);
108+
}
109+
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
110+
System.out.println(1);
111+
Assert.assertTrue(completableFuture.isDone());
112+
});
113+
}
114+
74115
@Test(timeOut = 10_000)
75116
public void testProducerCloseCallback() throws Exception {
76117
initClient();
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl;
20+
21+
import static org.mockito.Mockito.doAnswer;
22+
import static org.mockito.Mockito.spy;
23+
import static org.testng.Assert.assertEquals;
24+
import static org.testng.Assert.assertNotEquals;
25+
import static org.testng.Assert.assertTrue;
26+
import java.nio.charset.StandardCharsets;
27+
import java.util.Optional;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import lombok.extern.slf4j.Slf4j;
33+
import org.apache.pulsar.broker.BrokerTestUtil;
34+
import org.apache.pulsar.broker.service.ServerCnx;
35+
import org.apache.pulsar.client.api.Producer;
36+
import org.apache.pulsar.client.api.ProducerConsumerBase;
37+
import org.apache.pulsar.client.api.Schema;
38+
import org.awaitility.Awaitility;
39+
import org.testng.annotations.AfterClass;
40+
import org.testng.annotations.BeforeClass;
41+
import org.testng.annotations.Test;
42+
43+
@Slf4j
44+
@Test(groups = "broker-api")
45+
public class ProducerReconnectionTest extends ProducerConsumerBase {
46+
47+
@BeforeClass(alwaysRun = true)
48+
@Override
49+
protected void setup() throws Exception {
50+
super.internalSetup();
51+
super.producerBaseSetup();
52+
}
53+
54+
@AfterClass(alwaysRun = true)
55+
@Override
56+
protected void cleanup() throws Exception {
57+
super.internalCleanup();
58+
}
59+
60+
@Test
61+
public void testConcurrencyReconnectAndClose() throws Exception {
62+
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
63+
admin.topics().createNonPartitionedTopic(topicName);
64+
PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
65+
66+
// Create producer which will run with special steps.
67+
ProducerBuilderImpl<byte[]> producerBuilder = (ProducerBuilderImpl<byte[]>) client.newProducer()
68+
.blockIfQueueFull(false).maxPendingMessages(1).producerName("p1")
69+
.enableBatching(true).topic(topicName);
70+
CompletableFuture<Producer<byte[]>> producerFuture = new CompletableFuture<>();
71+
AtomicBoolean reconnectionStartTrigger = new AtomicBoolean();
72+
CountDownLatch reconnectingSignal = new CountDownLatch(1);
73+
CountDownLatch closedSignal = new CountDownLatch(1);
74+
ProducerImpl<byte[]> producer = new ProducerImpl<>(client, topicName, producerBuilder.getConf(), producerFuture,
75+
-1, Schema.BYTES, null, Optional.empty()) {
76+
@Override
77+
ConnectionHandler initConnectionHandler() {
78+
ConnectionHandler connectionHandler = super.initConnectionHandler();
79+
ConnectionHandler spyConnectionHandler = spy(connectionHandler);
80+
doAnswer(invocation -> {
81+
boolean result = (boolean) invocation.callRealMethod();
82+
if (reconnectionStartTrigger.get()) {
83+
log.info("[testConcurrencyReconnectAndClose] verified state for reconnection");
84+
reconnectingSignal.countDown();
85+
closedSignal.await();
86+
log.info("[testConcurrencyReconnectAndClose] reconnected");
87+
}
88+
return result;
89+
}).when(spyConnectionHandler).isValidStateForReconnection();
90+
return spyConnectionHandler;
91+
}
92+
};
93+
log.info("[testConcurrencyReconnectAndClose] producer created");
94+
producerFuture.get(5, TimeUnit.SECONDS);
95+
96+
// Reconnect.
97+
log.info("[testConcurrencyReconnectAndClose] trigger a reconnection");
98+
ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService().getTopic(topicName, false).join()
99+
.get().getProducers().values().iterator().next().getCnx();
100+
reconnectionStartTrigger.set(true);
101+
serverCnx.ctx().close();
102+
producer.sendAsync("1".getBytes(StandardCharsets.UTF_8));
103+
Awaitility.await().untilAsserted(() -> {
104+
assertNotEquals(producer.getPendingQueueSize(), 0);
105+
});
106+
107+
// Close producer when reconnecting.
108+
reconnectingSignal.await();
109+
log.info("[testConcurrencyReconnectAndClose] producer close");
110+
producer.closeAsync();
111+
Awaitility.await().untilAsserted(() -> {
112+
HandlerState.State state1 = producer.getState();
113+
assertTrue(state1 == HandlerState.State.Closed || state1 == HandlerState.State.Closing);
114+
});
115+
// give another thread time to call "signalToChangeStateToConnecting.await()".
116+
closedSignal.countDown();
117+
118+
// Wait for reconnection.
119+
Thread.sleep(3000);
120+
121+
HandlerState.State state2 = producer.getState();
122+
log.info("producer state: {}", state2);
123+
assertTrue(state2 == HandlerState.State.Closed || state2 == HandlerState.State.Closing);
124+
assertEquals(producer.getPendingQueueSize(), 0);
125+
126+
// Verify: ref is expected.
127+
producer.close();
128+
admin.topics().delete(topicName);
129+
}
130+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import java.net.InetSocketAddress;
2223
import java.net.URI;
2324
import java.util.Optional;
@@ -192,13 +193,12 @@ public void connectionClosed(ClientCnx cnx, Optional<Long> initialConnectionDela
192193
duringConnect.set(false);
193194
state.client.getCnxPool().releaseConnection(cnx);
194195
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
195-
if (!isValidStateForReconnection()) {
196+
if (!state.changeToConnecting()) {
196197
log.info("[{}] [{}] Ignoring reconnection request (state: {})",
197198
state.topic, state.getHandlerName(), state.getState());
198199
return;
199200
}
200201
long delayMs = initialConnectionDelayMs.orElse(backoff.next());
201-
state.setState(State.Connecting);
202202
log.info("[{}] [{}] Closed connection {} -- Will try again in {} s, hostUrl: {}",
203203
state.topic, state.getHandlerName(), cnx.channel(), delayMs / 1000.0, hostUrl.orElse(null));
204204
state.client.timer().newTimeout(timeout -> {
@@ -232,7 +232,8 @@ protected long switchClientCnx(ClientCnx clientCnx) {
232232
return EPOCH_UPDATER.incrementAndGet(this);
233233
}
234234

235-
private boolean isValidStateForReconnection() {
235+
@VisibleForTesting
236+
public boolean isValidStateForReconnection() {
236237
State state = this.state.getState();
237238
switch (state) {
238239
case Uninitialized:

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -303,16 +303,20 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
303303
producersClosedCounter = ip.newCounter("pulsar.client.producer.closed", Unit.Sessions,
304304
"The number of producer sessions closed", topic, Attributes.empty());
305305

306-
this.connectionHandler = new ConnectionHandler(this,
306+
this.connectionHandler = initConnectionHandler();
307+
setChunkMaxMessageSize();
308+
grabCnx();
309+
producersOpenedCounter.increment();
310+
}
311+
312+
ConnectionHandler initConnectionHandler() {
313+
return new ConnectionHandler(this,
307314
new BackoffBuilder()
308315
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
309316
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
310317
.setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
311318
.create(),
312-
this);
313-
setChunkMaxMessageSize();
314-
grabCnx();
315-
producersOpenedCounter.increment();
319+
this);
316320
}
317321

318322
private void setChunkMaxMessageSize() {
@@ -1151,7 +1155,7 @@ public CompletableFuture<Void> handleOnce() {
11511155

11521156

11531157
@Override
1154-
public CompletableFuture<Void> closeAsync() {
1158+
public synchronized CompletableFuture<Void> closeAsync() {
11551159
final State currentState = getAndUpdateState(state -> {
11561160
if (state == State.Closed) {
11571161
return state;
@@ -1179,11 +1183,11 @@ public CompletableFuture<Void> closeAsync() {
11791183
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
11801184
cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
11811185
cnx.removeProducer(producerId);
1182-
closeAndClearPendingMessages();
11831186
if (exception == null || !cnx.ctx().channel().isActive()) {
11841187
// Either we've received the success response for the close producer command from the broker, or the
11851188
// connection did break in the meantime. In any case, the producer is gone.
11861189
log.info("[{}] [{}] Closed Producer", topic, producerName);
1190+
closeAndClearPendingMessages();
11871191
closeFuture.complete(null);
11881192
} else {
11891193
closeFuture.completeExceptionally(exception);
@@ -1795,6 +1799,12 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
17951799
// Because the state could have been updated while retrieving the connection, we set it back to connecting,
17961800
// as long as the change from current state to connecting is a valid state change.
17971801
if (!changeToConnecting()) {
1802+
if (getState() == State.Closing || getState() == State.Closed) {
1803+
// Producer was closed while reconnecting, close the connection to make sure the broker
1804+
// drops the producer on its side
1805+
failPendingMessages(cnx,
1806+
new PulsarClientException.ProducerFencedException("producer has been closed"));
1807+
}
17981808
return CompletableFuture.completedFuture(null);
17991809
}
18001810
// We set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating
@@ -1855,6 +1865,8 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
18551865
// Producer was closed while reconnecting, close the connection to make sure the broker
18561866
// drops the producer on its side
18571867
cnx.removeProducer(producerId);
1868+
failPendingMessages(cnx,
1869+
new PulsarClientException.ProducerFencedException("producer has been closed"));
18581870
cnx.channel().close();
18591871
future.complete(null);
18601872
return;
@@ -2025,7 +2037,7 @@ private void closeProducerTasks() {
20252037

20262038
private void resendMessages(ClientCnx cnx, long expectedEpoch) {
20272039
cnx.ctx().channel().eventLoop().execute(() -> {
2028-
synchronized (this) {
2040+
synchronized (ProducerImpl.this) {
20292041
if (getState() == State.Closing || getState() == State.Closed) {
20302042
// Producer was closed while reconnecting, close the connection to make sure the broker
20312043
// drops the producer on its side
@@ -2181,7 +2193,7 @@ public void run(Timeout timeout) throws Exception {
21812193
* This fails and clears the pending messages with the given exception. This method should be called from within the
21822194
* ProducerImpl object mutex.
21832195
*/
2184-
private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
2196+
private synchronized void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
21852197
if (cnx == null) {
21862198
final AtomicInteger releaseCount = new AtomicInteger();
21872199
final boolean batchMessagingEnabled = isBatchMessagingEnabled();
@@ -2333,7 +2345,7 @@ private void batchMessageAndSend(boolean shouldScheduleNextBatchFlush) {
23332345
}
23342346
}
23352347

2336-
protected void processOpSendMsg(OpSendMsg op) {
2348+
protected synchronized void processOpSendMsg(OpSendMsg op) {
23372349
if (op == null) {
23382350
return;
23392351
}

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,8 @@
2525
import static org.testng.Assert.*;
2626

2727
import java.nio.ByteBuffer;
28-
import java.util.Optional;
29-
import java.util.concurrent.CompletableFuture;
30-
31-
import io.netty.util.HashedWheelTimer;
32-
import org.apache.pulsar.client.api.PulsarClientException;
3328
import org.apache.pulsar.client.api.Schema;
34-
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
35-
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
3629
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
37-
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
3830
import org.apache.pulsar.common.api.proto.MessageMetadata;
3931
import org.mockito.Mockito;
4032
import org.testng.annotations.Test;
@@ -74,35 +66,4 @@ public void testPopulateMessageSchema() {
7466
assertTrue(producer.populateMessageSchema(msg, null));
7567
verify(msg).setSchemaState(MessageImpl.SchemaState.Ready);
7668
}
77-
78-
@Test
79-
public void testClearPendingMessageWhenCloseAsync() {
80-
PulsarClientImpl client = mock(PulsarClientImpl.class);
81-
Mockito.doReturn(1L).when(client).newProducerId();
82-
ClientConfigurationData clientConf = new ClientConfigurationData();
83-
clientConf.setStatsIntervalSeconds(-1);
84-
Mockito.doReturn(clientConf).when(client).getConfiguration();
85-
Mockito.doReturn(new InstrumentProvider(null)).when(client).instrumentProvider();
86-
ConnectionPool connectionPool = mock(ConnectionPool.class);
87-
Mockito.doReturn(1).when(connectionPool).genRandomKeyToSelectCon();
88-
Mockito.doReturn(connectionPool).when(client).getCnxPool();
89-
HashedWheelTimer timer = mock(HashedWheelTimer.class);
90-
Mockito.doReturn(null).when(timer).newTimeout(Mockito.any(), Mockito.anyLong(), Mockito.any());
91-
Mockito.doReturn(timer).when(client).timer();
92-
ProducerConfigurationData producerConf = new ProducerConfigurationData();
93-
producerConf.setSendTimeoutMs(-1);
94-
ProducerImpl<?> producer = Mockito.spy(new ProducerImpl<>(client, "topicName", producerConf, null, 0, null, null, Optional.empty()));
95-
96-
// make sure throw exception when send request to broker
97-
ClientCnx clientCnx = mock(ClientCnx.class);
98-
CompletableFuture<ProducerResponse> tCompletableFuture = new CompletableFuture<>();
99-
tCompletableFuture.completeExceptionally(new PulsarClientException("error"));
100-
when(clientCnx.sendRequestWithId(Mockito.any(), Mockito.anyLong())).thenReturn(tCompletableFuture);
101-
Mockito.doReturn(clientCnx).when(producer).cnx();
102-
103-
// run closeAsync and verify
104-
CompletableFuture<Void> voidCompletableFuture = producer.closeAsync();
105-
verify(producer).closeAndClearPendingMessages();
106-
}
107-
10869
}

0 commit comments

Comments
 (0)