Skip to content

SOLR-17804: re-create SolrClient if closed. #3411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions solr/cross-dc-manager/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
implementation libs.dropwizard.metrics.servlets
implementation libs.eclipse.jetty.server
implementation libs.eclipse.jetty.ee10.servlet
implementation libs.google.guava
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this anymore

implementation libs.slf4j.api
runtimeOnly libs.google.protobuf.javautils
runtimeOnly libs.commonscodec.commonscodec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Arrays;
Expand All @@ -31,6 +32,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -85,7 +88,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private final int maxCollapseRecords;
private final SolrMessageProcessor messageProcessor;

protected final CloudSolrClient solrClient;
protected SolrClientSupplier solrClientSupplier;

private final ThreadPoolExecutor executor;

Expand All @@ -95,6 +98,59 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {

private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);

public static class SolrClientSupplier implements Supplier<CloudSolrClient>, AutoCloseable {
private final AtomicReference<CloudSolrClient> solrClientRef = new AtomicReference<>();
private final String zkConnectString;

public SolrClientSupplier(String zkConnectString) {
this.zkConnectString = zkConnectString;
}

@Override
public void close() {
IOUtils.closeQuietly(solrClientRef.get());
}

protected CloudSolrClient createSolrClient() {
log.info("-- creating new SolrClient...");
return new CloudSolrClient.Builder(
Collections.singletonList(zkConnectString), Optional.empty())
.build();
}

@Override
public CloudSolrClient get() {
CloudSolrClient existingClient = solrClientRef.get();
if (existingClient == null) {
synchronized (solrClientRef) {
if (solrClientRef.get() == null) {
log.info("- initializing Solr client");
solrClientRef.set(createSolrClient());
}
return solrClientRef.get();
}
}
if (existingClient.getClusterStateProvider().isClosed()) {
// lock out other threads and re-open the client if its ClusterStateProvider was closed
synchronized (solrClientRef) {
// refresh and check again
existingClient = solrClientRef.get();
if (existingClient.getClusterStateProvider().isClosed()) {
log.info("- re-creating Solr client because its CSP was closed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make more normal log messages without -s and not abbreviating things like "CSP"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies to all added log lines.

CloudSolrClient newClient = createSolrClient();
solrClientRef.set(newClient);
IOUtils.closeQuietly(existingClient);
return newClient;
} else {
return existingClient;
}
}
} else {
return existingClient;
}
}
}

/**
* @param conf The Kafka consumer configuration
* @param startLatch To inform the caller when the Consumer has started
Expand Down Expand Up @@ -157,7 +213,7 @@ public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
r -> new Thread(r, "KafkaCrossDcConsumerWorker"));
executor.prestartAllCoreThreads();

solrClient = createSolrClient(conf);
solrClientSupplier = createSolrClientSupplier(conf);

messageProcessor = createSolrMessageProcessor();

Expand All @@ -170,15 +226,28 @@ public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
log.info("Created Kafka resubmit producer");
}

protected SolrClientSupplier createSolrClientSupplier(KafkaCrossDcConf conf) {
return new SolrClientSupplier(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING));
}

protected SolrMessageProcessor createSolrMessageProcessor() {
return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
return new SolrMessageProcessor(solrClientSupplier, resubmitRequest -> 0L);
}

public KafkaConsumer<String, MirroredSolrRequest<?>> createKafkaConsumer(Properties properties) {
return new KafkaConsumer<>(
properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
}

protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
return new KafkaMirroringSink(conf);
}

@VisibleForTesting
public SolrClientSupplier getSolrClientSupplier() {
return solrClientSupplier;
}

/**
* This is where the magic happens.
*
Expand Down Expand Up @@ -219,7 +288,7 @@ public void run() {
log.warn("Failed to close kafka mirroring sink", e);
}
} finally {
IOUtils.closeQuietly(solrClient);
IOUtils.closeQuietly(solrClientSupplier);
}
}

Expand Down Expand Up @@ -506,7 +575,7 @@ public final void shutdown() {
offsetCheckExecutor.shutdown();
offsetCheckExecutor.awaitTermination(30, TimeUnit.SECONDS);
}
solrClient.close();
IOUtils.closeQuietly(solrClientSupplier);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while waiting for executor to shutdown");
Expand All @@ -516,15 +585,4 @@ public final void shutdown() {
Util.logMetrics(metrics);
}
}

protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
return new CloudSolrClient.Builder(
Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
Optional.empty())
.build();
}

protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
return new KafkaMirroringSink(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
Expand Down Expand Up @@ -59,13 +60,14 @@ public class SolrMessageProcessor extends MessageProcessor
private final MetricRegistry metrics =
SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);

final CloudSolrClient client;
final Supplier<CloudSolrClient> clientSupplier;

private static final String VERSION_FIELD = "_version_";

public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) {
public SolrMessageProcessor(
Supplier<CloudSolrClient> clientSupplier, ResubmitBackoffPolicy resubmitBackoffPolicy) {
super(resubmitBackoffPolicy);
this.client = client;
this.clientSupplier = clientSupplier;
}

@Override
Expand Down Expand Up @@ -190,14 +192,14 @@ private Result<MirroredSolrRequest<?>> processMirroredSolrRequest(
if (log.isDebugEnabled()) {
log.debug(
"Sending request to Solr at ZK address={} with params {}",
ZkStateReader.from(client).getZkClient().getZkServerAddress(),
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
request.getParams());
}
Result<MirroredSolrRequest<?>> result;
SolrResponseBase response;
Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(), "outputTime")).time();
try {
response = (SolrResponseBase) request.process(client);
response = (SolrResponseBase) request.process(clientSupplier.get());
} finally {
ctx.stop();
}
Expand All @@ -216,7 +218,7 @@ private Result<MirroredSolrRequest<?>> processMirroredSolrRequest(
if (log.isDebugEnabled()) {
log.debug(
"Finished sending request to Solr at ZK address={} with params {} status_code={}",
ZkStateReader.from(client).getZkClient().getZkServerAddress(),
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
request.getParams(),
status);
}
Expand Down Expand Up @@ -352,7 +354,7 @@ private void connectToSolrIfNeeded() {
boolean connected = false;
while (!connected) {
try {
client.connect(); // volatile null-check if already connected
clientSupplier.get().connect(); // volatile null-check if already connected
connected = true;
} catch (Exception e) {
log.error("Unable to connect to solr server. Not consuming.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected CrossDcConsumer getCrossDcConsumer(
return new KafkaCrossDcConsumer(conf, startLatch) {
@Override
protected SolrMessageProcessor createSolrMessageProcessor() {
return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L) {
return new SolrMessageProcessor(solrClientSupplier, resubmitRequest -> 0L) {
@Override
public Result<MirroredSolrRequest<?>> handleItem(
MirroredSolrRequest<?> mirroredSolrRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void beforeSimpleSolrIntegrationTest() throws Exception {

CloudSolrClient cloudClient1 = cluster1.getSolrClient();

processor = new SolrMessageProcessor(cloudClient1, null);
processor = new SolrMessageProcessor(() -> cloudClient1, null);

CollectionAdminRequest.Create create =
CollectionAdminRequest.createCollection(COLLECTION, 1, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
Expand All @@ -73,6 +75,9 @@ public class KafkaCrossDcConsumerTest {
private KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumerMock;
private CloudSolrClient solrClientMock;
private KafkaMirroringSink kafkaMirroringSinkMock;
private ClusterStateProvider clusterStateProviderMock;
private AtomicInteger solrClientCounter = new AtomicInteger(0);
private boolean clusterStateProviderIsClosed = false;

private SolrMessageProcessor messageProcessorMock;

Expand All @@ -86,10 +91,22 @@ public static void ensureWorkingMockito() {
@Before
public void setUp() {
kafkaConsumerMock = mock(KafkaConsumer.class);
clusterStateProviderMock = mock(ClusterStateProvider.class);
doAnswer(inv -> clusterStateProviderIsClosed).when(clusterStateProviderMock).isClosed();
solrClientMock = mock(CloudSolrClient.class);
doReturn(clusterStateProviderMock).when(solrClientMock).getClusterStateProvider();
kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
messageProcessorMock = mock(SolrMessageProcessor.class);
conf = testCrossDCConf();
KafkaCrossDcConsumer.SolrClientSupplier supplier =
new KafkaCrossDcConsumer.SolrClientSupplier(null) {
@Override
protected CloudSolrClient createSolrClient() {
solrClientCounter.incrementAndGet();
return solrClientMock;
}
};

// Set necessary configurations

kafkaCrossDcConsumer =
Expand All @@ -106,8 +123,8 @@ public SolrMessageProcessor createSolrMessageProcessor() {
}

@Override
protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
return solrClientMock;
protected SolrClientSupplier createSolrClientSupplier(KafkaCrossDcConf conf) {
return supplier;
}

@Override
Expand Down Expand Up @@ -186,6 +203,15 @@ protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
assertEquals(1, startLatch.getCount());
}

@Test
public void testSolrClientSupplier() throws Exception {
kafkaCrossDcConsumer.getSolrClientSupplier().get();
assertEquals(1, solrClientCounter.get());
clusterStateProviderIsClosed = true;
kafkaCrossDcConsumer.getSolrClientSupplier().get();
assertEquals(2, solrClientCounter.get());
}

@Test
public void testRunAndShutdown() throws Exception {
// Define the expected behavior of the mocks and set up the test scenario
Expand Down Expand Up @@ -219,7 +245,6 @@ public void testRunAndShutdown() throws Exception {

// Verify that the appropriate methods were called on the mocks
verify(kafkaConsumerMock).wakeup();
verify(solrClientMock).close();

consumerThreadExecutor.shutdown();
consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static void ensureWorkingMockito() {
public void setUp() {
client = mock(CloudSolrClient.class);
resubmitBackoffPolicy = mock(ResubmitBackoffPolicy.class);
solrMessageProcessor = new SolrMessageProcessor(client, resubmitBackoffPolicy);
solrMessageProcessor = new SolrMessageProcessor(() -> client, resubmitBackoffPolicy);
}

/** Should handle MirroredSolrRequest and return a failed result with no retry */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static void ensureWorkingMockito() {
public void setUp() {
MockitoAnnotations.initMocks(this);

processor = Mockito.spy(new SolrMessageProcessor(solrClient, backoffPolicy));
processor = Mockito.spy(new SolrMessageProcessor(() -> solrClient, backoffPolicy));
Mockito.doNothing().when(processor).uncheckedSleep(anyLong());
}

Expand Down
Loading