Skip to content

SOLR-17804: re-create SolrClient if closed. #3411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -85,7 +87,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private final int maxCollapseRecords;
private final SolrMessageProcessor messageProcessor;

protected final CloudSolrClient solrClient;
protected SolrClientSupplier solrClientSupplier;

private final ThreadPoolExecutor executor;

Expand All @@ -95,6 +97,68 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {

private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);

/**
* Supplier for creating and managing a working CloudSolrClient instance. This class ensures that
* the CloudSolrClient instance doesn't try to use its {@link
* org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider} in a closed state, which may
* happen if e.g. the ZooKeeper connection is lost. When this happens the CloudSolrClient is
* re-created to ensure it can continue to function properly.
*
* <p>TODO: this functionality should be moved to the CloudSolrClient itself.
*/
public static class SolrClientSupplier implements Supplier<CloudSolrClient>, AutoCloseable {
private final AtomicReference<CloudSolrClient> solrClientRef = new AtomicReference<>();
private final String zkConnectString;

public SolrClientSupplier(String zkConnectString) {
this.zkConnectString = zkConnectString;
}

@Override
public void close() {
IOUtils.closeQuietly(solrClientRef.get());
}

protected CloudSolrClient createSolrClient() {
log.debug("Creating new SolrClient...");
return new CloudSolrClient.Builder(
Collections.singletonList(zkConnectString), Optional.empty())
.build();
}

@Override
public CloudSolrClient get() {
CloudSolrClient existingClient = solrClientRef.get();
if (existingClient == null) {
synchronized (solrClientRef) {
if (solrClientRef.get() == null) {
log.info("Initializing Solr client.");
solrClientRef.set(createSolrClient());
}
return solrClientRef.get();
}
}
if (existingClient.getClusterStateProvider().isClosed()) {
// lock out other threads and re-open the client if its ClusterStateProvider was closed
synchronized (solrClientRef) {
// refresh and check again
existingClient = solrClientRef.get();
if (existingClient.getClusterStateProvider().isClosed()) {
log.info("Re-creating Solr client because its ClusterStateProvider was closed.");
CloudSolrClient newClient = createSolrClient();
solrClientRef.set(newClient);
IOUtils.closeQuietly(existingClient);
return newClient;
} else {
return existingClient;
}
}
} else {
return existingClient;
}
}
}

/**
* @param conf The Kafka consumer configuration
* @param startLatch To inform the caller when the Consumer has started
Expand Down Expand Up @@ -157,7 +221,7 @@ public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
r -> new Thread(r, "KafkaCrossDcConsumerWorker"));
executor.prestartAllCoreThreads();

solrClient = createSolrClient(conf);
solrClientSupplier = createSolrClientSupplier(conf);

messageProcessor = createSolrMessageProcessor();

Expand All @@ -170,15 +234,23 @@ public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
log.info("Created Kafka resubmit producer");
}

protected SolrClientSupplier createSolrClientSupplier(KafkaCrossDcConf conf) {
return new SolrClientSupplier(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING));
}

protected SolrMessageProcessor createSolrMessageProcessor() {
return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
return new SolrMessageProcessor(solrClientSupplier, resubmitRequest -> 0L);
}

public KafkaConsumer<String, MirroredSolrRequest<?>> createKafkaConsumer(Properties properties) {
return new KafkaConsumer<>(
properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
}

protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
return new KafkaMirroringSink(conf);
}

/**
* This is where the magic happens.
*
Expand Down Expand Up @@ -219,7 +291,7 @@ public void run() {
log.warn("Failed to close kafka mirroring sink", e);
}
} finally {
IOUtils.closeQuietly(solrClient);
IOUtils.closeQuietly(solrClientSupplier);
}
}

Expand Down Expand Up @@ -506,7 +578,7 @@ public final void shutdown() {
offsetCheckExecutor.shutdown();
offsetCheckExecutor.awaitTermination(30, TimeUnit.SECONDS);
}
solrClient.close();
IOUtils.closeQuietly(solrClientSupplier);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while waiting for executor to shutdown");
Expand All @@ -516,15 +588,4 @@ public final void shutdown() {
Util.logMetrics(metrics);
}
}

protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
return new CloudSolrClient.Builder(
Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
Optional.empty())
.build();
}

protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
return new KafkaMirroringSink(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
Expand Down Expand Up @@ -59,13 +60,14 @@ public class SolrMessageProcessor extends MessageProcessor
private final MetricRegistry metrics =
SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);

final CloudSolrClient client;
final Supplier<CloudSolrClient> clientSupplier;

private static final String VERSION_FIELD = "_version_";

public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) {
public SolrMessageProcessor(
Supplier<CloudSolrClient> clientSupplier, ResubmitBackoffPolicy resubmitBackoffPolicy) {
super(resubmitBackoffPolicy);
this.client = client;
this.clientSupplier = clientSupplier;
}

@Override
Expand Down Expand Up @@ -190,14 +192,14 @@ private Result<MirroredSolrRequest<?>> processMirroredSolrRequest(
if (log.isDebugEnabled()) {
log.debug(
"Sending request to Solr at ZK address={} with params {}",
ZkStateReader.from(client).getZkClient().getZkServerAddress(),
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
request.getParams());
}
Result<MirroredSolrRequest<?>> result;
SolrResponseBase response;
Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(), "outputTime")).time();
try {
response = (SolrResponseBase) request.process(client);
response = (SolrResponseBase) request.process(clientSupplier.get());
} finally {
ctx.stop();
}
Expand All @@ -216,7 +218,7 @@ private Result<MirroredSolrRequest<?>> processMirroredSolrRequest(
if (log.isDebugEnabled()) {
log.debug(
"Finished sending request to Solr at ZK address={} with params {} status_code={}",
ZkStateReader.from(client).getZkClient().getZkServerAddress(),
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
request.getParams(),
status);
}
Expand Down Expand Up @@ -352,7 +354,7 @@ private void connectToSolrIfNeeded() {
boolean connected = false;
while (!connected) {
try {
client.connect(); // volatile null-check if already connected
clientSupplier.get().connect(); // volatile null-check if already connected
connected = true;
} catch (Exception e) {
log.error("Unable to connect to solr server. Not consuming.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected CrossDcConsumer getCrossDcConsumer(
return new KafkaCrossDcConsumer(conf, startLatch) {
@Override
protected SolrMessageProcessor createSolrMessageProcessor() {
return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L) {
return new SolrMessageProcessor(solrClientSupplier, resubmitRequest -> 0L) {
@Override
public Result<MirroredSolrRequest<?>> handleItem(
MirroredSolrRequest<?> mirroredSolrRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void beforeSimpleSolrIntegrationTest() throws Exception {

CloudSolrClient cloudClient1 = cluster1.getSolrClient();

processor = new SolrMessageProcessor(cloudClient1, null);
processor = new SolrMessageProcessor(() -> cloudClient1, null);

CollectionAdminRequest.Create create =
CollectionAdminRequest.createCollection(COLLECTION, 1, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
Expand All @@ -73,6 +75,10 @@ public class KafkaCrossDcConsumerTest {
private KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumerMock;
private CloudSolrClient solrClientMock;
private KafkaMirroringSink kafkaMirroringSinkMock;
private ClusterStateProvider clusterStateProviderMock;
private KafkaCrossDcConsumer.SolrClientSupplier supplier;
private AtomicInteger solrClientCounter = new AtomicInteger(0);
private boolean clusterStateProviderIsClosed = false;

private SolrMessageProcessor messageProcessorMock;

Expand All @@ -86,10 +92,22 @@ public static void ensureWorkingMockito() {
@Before
public void setUp() {
kafkaConsumerMock = mock(KafkaConsumer.class);
clusterStateProviderMock = mock(ClusterStateProvider.class);
doAnswer(inv -> clusterStateProviderIsClosed).when(clusterStateProviderMock).isClosed();
solrClientMock = mock(CloudSolrClient.class);
doReturn(clusterStateProviderMock).when(solrClientMock).getClusterStateProvider();
kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
messageProcessorMock = mock(SolrMessageProcessor.class);
conf = testCrossDCConf();
supplier =
new KafkaCrossDcConsumer.SolrClientSupplier(null) {
@Override
protected CloudSolrClient createSolrClient() {
solrClientCounter.incrementAndGet();
return solrClientMock;
}
};

// Set necessary configurations

kafkaCrossDcConsumer =
Expand All @@ -106,8 +124,8 @@ public SolrMessageProcessor createSolrMessageProcessor() {
}

@Override
protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
return solrClientMock;
protected SolrClientSupplier createSolrClientSupplier(KafkaCrossDcConf conf) {
return supplier;
}

@Override
Expand Down Expand Up @@ -186,6 +204,15 @@ protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
assertEquals(1, startLatch.getCount());
}

@Test
public void testSolrClientSupplier() throws Exception {
supplier.get();
assertEquals(1, solrClientCounter.get());
clusterStateProviderIsClosed = true;
supplier.get();
assertEquals(2, solrClientCounter.get());
}

@Test
public void testRunAndShutdown() throws Exception {
// Define the expected behavior of the mocks and set up the test scenario
Expand Down Expand Up @@ -219,7 +246,6 @@ public void testRunAndShutdown() throws Exception {

// Verify that the appropriate methods were called on the mocks
verify(kafkaConsumerMock).wakeup();
verify(solrClientMock).close();

consumerThreadExecutor.shutdown();
consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static void ensureWorkingMockito() {
public void setUp() {
client = mock(CloudSolrClient.class);
resubmitBackoffPolicy = mock(ResubmitBackoffPolicy.class);
solrMessageProcessor = new SolrMessageProcessor(client, resubmitBackoffPolicy);
solrMessageProcessor = new SolrMessageProcessor(() -> client, resubmitBackoffPolicy);
}

/** Should handle MirroredSolrRequest and return a failed result with no retry */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static void ensureWorkingMockito() {
public void setUp() {
MockitoAnnotations.initMocks(this);

processor = Mockito.spy(new SolrMessageProcessor(solrClient, backoffPolicy));
processor = Mockito.spy(new SolrMessageProcessor(() -> solrClient, backoffPolicy));
Mockito.doNothing().when(processor).uncheckedSleep(anyLong());
}

Expand Down
Loading