Skip to content

Commit 3abe6bd

Browse files
committed
SOLR-17804: re-create SolrClient if closed. (apache#3411)
1 parent cba01d9 commit 3abe6bd

File tree

7 files changed

+119
-30
lines changed

7 files changed

+119
-30
lines changed

solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java

Lines changed: 77 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.util.concurrent.Future;
3232
import java.util.concurrent.ThreadPoolExecutor;
3333
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.atomic.AtomicReference;
35+
import java.util.function.Supplier;
3436
import org.apache.kafka.clients.consumer.ConsumerConfig;
3537
import org.apache.kafka.clients.consumer.ConsumerRecord;
3638
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -85,7 +87,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
8587
private final int maxCollapseRecords;
8688
private final SolrMessageProcessor messageProcessor;
8789

88-
protected final CloudSolrClient solrClient;
90+
protected SolrClientSupplier solrClientSupplier;
8991

9092
private final ThreadPoolExecutor executor;
9193

@@ -95,6 +97,68 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
9597

9698
private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
9799

100+
/**
101+
* Supplier for creating and managing a working CloudSolrClient instance. This class ensures that
102+
* the CloudSolrClient instance doesn't try to use its {@link
103+
* org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider} in a closed state, which may
104+
* happen if e.g. the ZooKeeper connection is lost. When this happens the CloudSolrClient is
105+
* re-created to ensure it can continue to function properly.
106+
*
107+
* <p>TODO: this functionality should be moved to the CloudSolrClient itself.
108+
*/
109+
public static class SolrClientSupplier implements Supplier<CloudSolrClient>, AutoCloseable {
110+
private final AtomicReference<CloudSolrClient> solrClientRef = new AtomicReference<>();
111+
private final String zkConnectString;
112+
113+
public SolrClientSupplier(String zkConnectString) {
114+
this.zkConnectString = zkConnectString;
115+
}
116+
117+
@Override
118+
public void close() {
119+
IOUtils.closeQuietly(solrClientRef.get());
120+
}
121+
122+
protected CloudSolrClient createSolrClient() {
123+
log.debug("Creating new SolrClient...");
124+
return new CloudSolrClient.Builder(
125+
Collections.singletonList(zkConnectString), Optional.empty())
126+
.build();
127+
}
128+
129+
@Override
130+
public CloudSolrClient get() {
131+
CloudSolrClient existingClient = solrClientRef.get();
132+
if (existingClient == null) {
133+
synchronized (solrClientRef) {
134+
if (solrClientRef.get() == null) {
135+
log.info("Initializing Solr client.");
136+
solrClientRef.set(createSolrClient());
137+
}
138+
return solrClientRef.get();
139+
}
140+
}
141+
if (existingClient.getClusterStateProvider().isClosed()) {
142+
// lock out other threads and re-open the client if its ClusterStateProvider was closed
143+
synchronized (solrClientRef) {
144+
// refresh and check again
145+
existingClient = solrClientRef.get();
146+
if (existingClient.getClusterStateProvider().isClosed()) {
147+
log.info("Re-creating Solr client because its ClusterStateProvider was closed.");
148+
CloudSolrClient newClient = createSolrClient();
149+
solrClientRef.set(newClient);
150+
IOUtils.closeQuietly(existingClient);
151+
return newClient;
152+
} else {
153+
return existingClient;
154+
}
155+
}
156+
} else {
157+
return existingClient;
158+
}
159+
}
160+
}
161+
98162
/**
99163
* @param conf The Kafka consumer configuration
100164
* @param startLatch To inform the caller when the Consumer has started
@@ -157,7 +221,7 @@ public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
157221
r -> new Thread(r, "KafkaCrossDcConsumerWorker"));
158222
executor.prestartAllCoreThreads();
159223

160-
solrClient = createSolrClient(conf);
224+
solrClientSupplier = createSolrClientSupplier(conf);
161225

162226
messageProcessor = createSolrMessageProcessor();
163227

@@ -170,15 +234,23 @@ public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
170234
log.info("Created Kafka resubmit producer");
171235
}
172236

237+
protected SolrClientSupplier createSolrClientSupplier(KafkaCrossDcConf conf) {
238+
return new SolrClientSupplier(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING));
239+
}
240+
173241
protected SolrMessageProcessor createSolrMessageProcessor() {
174-
return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
242+
return new SolrMessageProcessor(solrClientSupplier, resubmitRequest -> 0L);
175243
}
176244

177245
public KafkaConsumer<String, MirroredSolrRequest<?>> createKafkaConsumer(Properties properties) {
178246
return new KafkaConsumer<>(
179247
properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
180248
}
181249

250+
protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
251+
return new KafkaMirroringSink(conf);
252+
}
253+
182254
/**
183255
* This is where the magic happens.
184256
*
@@ -219,7 +291,7 @@ public void run() {
219291
log.warn("Failed to close kafka mirroring sink", e);
220292
}
221293
} finally {
222-
IOUtils.closeQuietly(solrClient);
294+
IOUtils.closeQuietly(solrClientSupplier);
223295
}
224296
}
225297

@@ -511,7 +583,7 @@ public final void shutdown() {
511583
offsetCheckExecutor.shutdown();
512584
offsetCheckExecutor.awaitTermination(30, TimeUnit.SECONDS);
513585
}
514-
solrClient.close();
586+
IOUtils.closeQuietly(solrClientSupplier);
515587
} catch (InterruptedException e) {
516588
Thread.currentThread().interrupt();
517589
log.warn("Interrupted while waiting for executor to shutdown");
@@ -521,15 +593,4 @@ public final void shutdown() {
521593
Util.logMetrics(metrics);
522594
}
523595
}
524-
525-
protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
526-
return new CloudSolrClient.Builder(
527-
Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
528-
Optional.empty())
529-
.build();
530-
}
531-
532-
protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
533-
return new KafkaMirroringSink(conf);
534-
}
535596
}

solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.function.Supplier;
2627
import org.apache.solr.client.solrj.SolrRequest;
2728
import org.apache.solr.client.solrj.impl.CloudSolrClient;
2829
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -59,13 +60,14 @@ public class SolrMessageProcessor extends MessageProcessor
5960
private final MetricRegistry metrics =
6061
SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
6162

62-
final CloudSolrClient client;
63+
final Supplier<CloudSolrClient> clientSupplier;
6364

6465
private static final String VERSION_FIELD = "_version_";
6566

66-
public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) {
67+
public SolrMessageProcessor(
68+
Supplier<CloudSolrClient> clientSupplier, ResubmitBackoffPolicy resubmitBackoffPolicy) {
6769
super(resubmitBackoffPolicy);
68-
this.client = client;
70+
this.clientSupplier = clientSupplier;
6971
}
7072

7173
@Override
@@ -190,14 +192,14 @@ private Result<MirroredSolrRequest<?>> processMirroredSolrRequest(
190192
if (log.isDebugEnabled()) {
191193
log.debug(
192194
"Sending request to Solr at ZK address={} with params {}",
193-
ZkStateReader.from(client).getZkClient().getZkServerAddress(),
195+
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
194196
request.getParams());
195197
}
196198
Result<MirroredSolrRequest<?>> result;
197199
SolrResponseBase response;
198200
Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(), "outputTime")).time();
199201
try {
200-
response = (SolrResponseBase) request.process(client);
202+
response = (SolrResponseBase) request.process(clientSupplier.get());
201203
} finally {
202204
ctx.stop();
203205
}
@@ -216,7 +218,7 @@ private Result<MirroredSolrRequest<?>> processMirroredSolrRequest(
216218
if (log.isDebugEnabled()) {
217219
log.debug(
218220
"Finished sending request to Solr at ZK address={} with params {} status_code={}",
219-
ZkStateReader.from(client).getZkClient().getZkServerAddress(),
221+
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
220222
request.getParams(),
221223
status);
222224
}
@@ -353,7 +355,7 @@ private void connectToSolrIfNeeded() {
353355
boolean connected = false;
354356
while (!connected) {
355357
try {
356-
client.connect(); // volatile null-check if already connected
358+
clientSupplier.get().connect(); // volatile null-check if already connected
357359
connected = true;
358360
} catch (Exception e) {
359361
log.error("Unable to connect to solr server. Not consuming.", e);

solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ protected CrossDcConsumer getCrossDcConsumer(
165165
return new KafkaCrossDcConsumer(conf, startLatch) {
166166
@Override
167167
protected SolrMessageProcessor createSolrMessageProcessor() {
168-
return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L) {
168+
return new SolrMessageProcessor(solrClientSupplier, resubmitRequest -> 0L) {
169169
@Override
170170
public Result<MirroredSolrRequest<?>> handleItem(
171171
MirroredSolrRequest<?> mirroredSolrRequest) {

solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public static void beforeSimpleSolrIntegrationTest() throws Exception {
5151

5252
CloudSolrClient cloudClient1 = cluster1.getSolrClient();
5353

54-
processor = new SolrMessageProcessor(cloudClient1, null);
54+
processor = new SolrMessageProcessor(() -> cloudClient1, null);
5555

5656
CollectionAdminRequest.Create create =
5757
CollectionAdminRequest.createCollection(COLLECTION, 1, 1);

solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,14 @@
4343
import java.util.concurrent.CountDownLatch;
4444
import java.util.concurrent.ExecutorService;
4545
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.atomic.AtomicInteger;
4647
import org.apache.kafka.clients.consumer.ConsumerRecord;
4748
import org.apache.kafka.clients.consumer.ConsumerRecords;
4849
import org.apache.kafka.clients.consumer.KafkaConsumer;
4950
import org.apache.kafka.common.TopicPartition;
5051
import org.apache.kafka.common.errors.WakeupException;
5152
import org.apache.solr.client.solrj.impl.CloudSolrClient;
53+
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
5254
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
5355
import org.apache.solr.client.solrj.request.UpdateRequest;
5456
import org.apache.solr.common.SolrInputDocument;
@@ -74,6 +76,10 @@ public class KafkaCrossDcConsumerTest {
7476
private KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumerMock;
7577
private CloudSolrClient solrClientMock;
7678
private KafkaMirroringSink kafkaMirroringSinkMock;
79+
private ClusterStateProvider clusterStateProviderMock;
80+
private KafkaCrossDcConsumer.SolrClientSupplier supplier;
81+
private AtomicInteger solrClientCounter = new AtomicInteger(0);
82+
private boolean clusterStateProviderIsClosed = false;
7783

7884
private SolrMessageProcessor messageProcessorMock;
7985

@@ -87,10 +93,22 @@ public static void ensureWorkingMockito() {
8793
@Before
8894
public void setUp() {
8995
kafkaConsumerMock = mock(KafkaConsumer.class);
96+
clusterStateProviderMock = mock(ClusterStateProvider.class);
97+
doAnswer(inv -> clusterStateProviderIsClosed).when(clusterStateProviderMock).isClosed();
9098
solrClientMock = mock(CloudSolrClient.class);
99+
doReturn(clusterStateProviderMock).when(solrClientMock).getClusterStateProvider();
91100
kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
92101
messageProcessorMock = mock(SolrMessageProcessor.class);
93102
conf = testCrossDCConf();
103+
supplier =
104+
new KafkaCrossDcConsumer.SolrClientSupplier(null) {
105+
@Override
106+
protected CloudSolrClient createSolrClient() {
107+
solrClientCounter.incrementAndGet();
108+
return solrClientMock;
109+
}
110+
};
111+
94112
// Set necessary configurations
95113

96114
kafkaCrossDcConsumer =
@@ -107,8 +125,8 @@ public SolrMessageProcessor createSolrMessageProcessor() {
107125
}
108126

109127
@Override
110-
protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
111-
return solrClientMock;
128+
protected SolrClientSupplier createSolrClientSupplier(KafkaCrossDcConf conf) {
129+
return supplier;
112130
}
113131

114132
@Override
@@ -187,6 +205,15 @@ protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
187205
assertEquals(1, startLatch.getCount());
188206
}
189207

208+
@Test
209+
public void testSolrClientSupplier() throws Exception {
210+
supplier.get();
211+
assertEquals(1, solrClientCounter.get());
212+
clusterStateProviderIsClosed = true;
213+
supplier.get();
214+
assertEquals(2, solrClientCounter.get());
215+
}
216+
190217
@Test
191218
public void testRunAndShutdown() throws Exception {
192219
// Define the expected behavior of the mocks and set up the test scenario
@@ -220,7 +247,6 @@ public void testRunAndShutdown() throws Exception {
220247

221248
// Verify that the appropriate methods were called on the mocks
222249
verify(kafkaConsumerMock).wakeup();
223-
verify(solrClientMock).close();
224250

225251
consumerThreadExecutor.shutdown();
226252
consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);

solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public static void ensureWorkingMockito() {
5555
public void setUp() {
5656
client = mock(CloudSolrClient.class);
5757
resubmitBackoffPolicy = mock(ResubmitBackoffPolicy.class);
58-
solrMessageProcessor = new SolrMessageProcessor(client, resubmitBackoffPolicy);
58+
solrMessageProcessor = new SolrMessageProcessor(() -> client, resubmitBackoffPolicy);
5959
}
6060

6161
/** Should handle MirroredSolrRequest and return a failed result with no retry */

solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public static void ensureWorkingMockito() {
7070
public void setUp() {
7171
MockitoAnnotations.initMocks(this);
7272

73-
processor = Mockito.spy(new SolrMessageProcessor(solrClient, backoffPolicy));
73+
processor = Mockito.spy(new SolrMessageProcessor(() -> solrClient, backoffPolicy));
7474
Mockito.doNothing().when(processor).uncheckedSleep(anyLong());
7575
}
7676

0 commit comments

Comments
 (0)