Skip to content

XWIKI-23239: Improve Solr indexing speed through parallelization and batch processing #4235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 6, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@
*/
package org.xwiki.search.solr.internal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton;

import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.xwiki.bridge.internal.DocumentContextExecutor;
Expand Down Expand Up @@ -62,6 +66,7 @@
import org.xwiki.search.solr.internal.reference.SolrReferenceResolver;
import org.xwiki.store.ReadyIndicator;

import com.google.common.util.concurrent.Uninterruptibles;
import com.xpn.xwiki.XWikiContext;
import com.xpn.xwiki.doc.XWikiDocument;
import com.xpn.xwiki.util.AbstractXWikiRunnable;
Expand Down Expand Up @@ -359,6 +364,16 @@ private ResolveQueueEntry getQueueEntry()
*/
private BlockingQueue<ResolveQueueEntry> resolveQueue;

/**
* The executor for Solr client operations.
*/
private ExecutorService solrClientExecutor;

/**
* Future to wait on the last commit operation.
*/
private Future<Void> commitFuture;

/**
* Thread in which the indexUpdater will be executed.
*/
Expand Down Expand Up @@ -391,13 +406,32 @@ private ResolveQueueEntry getQueueEntry()
*/
private volatile int batchSize;

/**
* The length of the not yet sent batch in characters.
*/
private int batchLength;

/**
* The size of the batch that is currently being sent and committed.
*/
private volatile int committingBatchSize;

@Override
public void initialize() throws InitializationException
{
// Initialize the queues before starting the threads.
this.resolveQueue = new LinkedBlockingQueue<>();
this.indexQueue = new LinkedBlockingQueue<>(this.configuration.getIndexerQueueCapacity());

// Launch the Solr client executor.
this.solrClientExecutor = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("XWiki Solr client thread");
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY - 1);
return thread;
});

// Launch the resolve thread
this.resolveThread = new Thread(new Resolver());
this.resolveThread.setName("XWiki Solr resolve thread");
Expand Down Expand Up @@ -480,43 +514,48 @@ public void run()
*/
private boolean processBatch(IndexQueueEntry queueEntry)
{
int length = 0;
List<SolrInputDocument> solrDocuments = new ArrayList<>();

for (IndexQueueEntry batchEntry = queueEntry; batchEntry != null; batchEntry = this.indexQueue.poll()) {
this.indexQueueRemovalCounter.incrementAndGet();

if (batchEntry == INDEX_QUEUE_ENTRY_STOP) {
// Handle the shutdown of the executor here to avoid that the executor is shut down before the batch
// processing finished.
this.solrClientExecutor.shutdown();
// Discard the current batch and stop the indexing thread.
return false;
}

IndexOperation operation = batchEntry.operation;

// For the current contiguous operations queue, group the changes
try {
ExecutionContext executionContext = new ExecutionContext();
this.ecim.initialize(executionContext);
XWikiContext xcontext = (XWikiContext) executionContext.getProperty(XWikiContext.EXECUTIONCONTEXT_KEY);
xcontext.setUserReference(indexingUserConfig.getIndexingUserReference());

switch (operation) {
switch (batchEntry.operation) {
case INDEX:
XWikiSolrInputDocument solrDocument = getSolrDocument(batchEntry.reference);
if (solrDocument != null) {
solrInstance.add(solrDocument);
length += solrDocument.getLength();
solrDocuments.add(solrDocument);
this.batchLength += solrDocument.getLength();
++this.batchSize;
}
break;
case DELETE:
// Ensure that all queued additions are processed before any deletions are processed.
flushDocuments(solrDocuments);
applyDeletion(batchEntry);

++this.batchSize;
break;
case READY_MARKER:
commit();
batchEntry.readyIndicator.complete(null);
length = 0;
commit(solrDocuments);
// Add the completion of the ready indicator to the same queue to ensure that it is processed
// after the commit.
IndexQueueEntry finalBatchEntry = batchEntry;
this.solrClientExecutor.execute(() -> finalBatchEntry.readyIndicator.complete(null));
break;
default:
// Do nothing.
Expand All @@ -529,48 +568,91 @@ private boolean processBatch(IndexQueueEntry queueEntry)

// Commit the index changes so that they become available to queries. This is a costly operation and that is
// the reason why we perform it at the end of the batch.
if (shouldCommit(length, this.batchSize)) {
commit();
length = 0;
if (shouldCommit(this.batchLength, this.batchSize)) {
commit(solrDocuments);
}
}

// Commit what's left
if (this.batchSize > 0) {
commit();
commit(solrDocuments);
}

return true;
}

private void applyDeletion(IndexQueueEntry queueEntry) throws SolrServerException, IOException, SolrIndexerException
private void applyDeletion(IndexQueueEntry queueEntry)
{
if (queueEntry.reference == null) {
solrInstance.deleteByQuery(queueEntry.deleteQuery);
} else {
solrInstance.delete(this.solrRefereceResolver.getId(queueEntry.reference));
this.solrClientExecutor.execute(() -> {
try {
if (queueEntry.reference == null) {
this.solrInstance.deleteByQuery(queueEntry.deleteQuery);
} else {
this.solrInstance.delete(this.solrRefereceResolver.getId(queueEntry.reference));
}
} catch (Exception e) {
this.logger.error("Failed to delete document [{}] from the Solr server", queueEntry, e);
}
});
}

private void flushDocuments(List<SolrInputDocument> documents)
{
try {
// Copy the documents to flush to a new list so that we can clear the original list without affecting the
// documents that are being added to the Solr server.
List<SolrInputDocument> documentsToFlush = new ArrayList<>(documents);
this.solrClientExecutor.execute(() -> {
try {
this.solrInstance.add(documentsToFlush);
} catch (Exception e) {
this.logger.error("Failed to add documents to the Solr server", e);
}
});
} finally {
// Clear the list of documents even when adding them failed to avoid leaking memory and to avoid
// re-submitting the same documents to the Solr server multiple times.
documents.clear();
}
}

/**
* Commit.
*/
private void commit()
private void commit(List<SolrInputDocument> documents)
{
try {
solrInstance.commit();
} catch (Exception e) {
this.logger.error("Failed to commit index changes to the Solr server. Rolling back.", e);

flushDocuments(documents);
if (this.commitFuture != null) {
try {
solrInstance.rollback();
} catch (Exception ex) {
// Just log the failure.
this.logger.error("Failed to rollback index changes.", ex);
Uninterruptibles.getUninterruptibly(this.commitFuture);
} catch (ExecutionException e) {
this.logger.error("Failed to commit index changes to the Solr server", e);
}
}

// At this point, we can be sure that there is no more commit in the queue - so we can safely store the size
// of the next commit.
this.committingBatchSize = this.batchSize;
this.commitFuture = this.solrClientExecutor.submit(() -> {
try {
this.solrInstance.commit();
} catch (Exception e) {
this.logger.error("Failed to commit index changes to the Solr server. Rolling back.", e);

try {
this.solrInstance.rollback();
} catch (Exception ex) {
// Just log the failure.
this.logger.error("Failed to rollback index changes.", ex);
}
}

this.committingBatchSize = 0;

return null;
});
this.batchSize = 0;
this.batchLength = 0;
}

/**
Expand Down Expand Up @@ -675,7 +757,7 @@ private void addToQueue(EntityReference reference, boolean recurse, IndexOperati
@Override
public int getQueueSize()
{
return this.indexQueue.size() + this.resolveQueue.size() + this.batchSize;
return this.indexQueue.size() + this.resolveQueue.size() + this.batchSize + this.committingBatchSize;
}

@Override
Expand Down
Loading