From 488ff7d28302cdde6c9e8315d19c73b64ab97fa2 Mon Sep 17 00:00:00 2001 From: Michael Hamann Date: Mon, 2 Jun 2025 14:37:37 +0200 Subject: [PATCH 1/2] XWIKI-23239: Improve Solr indexing speed through parallelization and batch processing * Collect indexed documents and submit them in batches to Solr for more efficient processing and less overhead due to repeated calls to Solr. --- .../solr/internal/DefaultSolrIndexer.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/xwiki-platform-core/xwiki-platform-search/xwiki-platform-search-solr/xwiki-platform-search-solr-api/src/main/java/org/xwiki/search/solr/internal/DefaultSolrIndexer.java b/xwiki-platform-core/xwiki-platform-search/xwiki-platform-search-solr/xwiki-platform-search-solr-api/src/main/java/org/xwiki/search/solr/internal/DefaultSolrIndexer.java index f752bd5dabf1..9fe99cdffa44 100644 --- a/xwiki-platform-core/xwiki-platform-search/xwiki-platform-search-solr/xwiki-platform-search-solr-api/src/main/java/org/xwiki/search/solr/internal/DefaultSolrIndexer.java +++ b/xwiki-platform-core/xwiki-platform-search/xwiki-platform-search-solr/xwiki-platform-search-solr-api/src/main/java/org/xwiki/search/solr/internal/DefaultSolrIndexer.java @@ -20,7 +20,9 @@ package org.xwiki.search.solr.internal; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; @@ -482,6 +484,8 @@ private boolean processBatch(IndexQueueEntry queueEntry) { int length = 0; + List solrDocuments = new ArrayList<>(); + for (IndexQueueEntry batchEntry = queueEntry; batchEntry != null; batchEntry = this.indexQueue.poll()) { this.indexQueueRemovalCounter.incrementAndGet(); @@ -503,18 +507,20 @@ private boolean processBatch(IndexQueueEntry queueEntry) case INDEX: XWikiSolrInputDocument solrDocument = getSolrDocument(batchEntry.reference); if (solrDocument != null) { - solrInstance.add(solrDocument); + solrDocuments.add(solrDocument); length += solrDocument.getLength(); ++this.batchSize; } break; case DELETE: + // Ensure that all queued additions are processed before any deletions are processed. + flushDocuments(solrDocuments); applyDeletion(batchEntry); ++this.batchSize; break; case READY_MARKER: - commit(); + commit(solrDocuments); batchEntry.readyIndicator.complete(null); length = 0; break; @@ -530,14 +536,14 @@ private boolean processBatch(IndexQueueEntry queueEntry) // Commit the index changes so that they become available to queries. This is a costly operation and that is // the reason why we perform it at the end of the batch. if (shouldCommit(length, this.batchSize)) { - commit(); + commit(solrDocuments); length = 0; } } // Commit what's left if (this.batchSize > 0) { - commit(); + commit(solrDocuments); } return true; @@ -552,12 +558,24 @@ private void applyDeletion(IndexQueueEntry queueEntry) throws SolrServerExceptio } } + private void flushDocuments(List documents) throws SolrServerException, IOException + { + try { + this.solrInstance.add(documents); + } finally { + // Clear the list of documents even when adding them failed to avoid leaking memory and to avoid + // re-submitting the same documents to the Solr server multiple times. + documents.clear(); + } + } + /** * Commit. */ - private void commit() + private void commit(List documents) { try { + flushDocuments(documents); solrInstance.commit(); } catch (Exception e) { this.logger.error("Failed to commit index changes to the Solr server. Rolling back.", e); From b12297ab76e4076835969f7f7b35bd9d829c3a0b Mon Sep 17 00:00:00 2001 From: Michael Hamann Date: Wed, 4 Jun 2025 11:46:02 +0200 Subject: [PATCH 2/2] XWIKI-23239: Improve Solr indexing speed through parallelization and batch processing * Move all Solr client requests into a separate executor to allow the next batch to be prepared while the previous one is committed. --- .../solr/internal/DefaultSolrIndexer.java | 126 +++++++++++++----- 1 file changed, 95 insertions(+), 31 deletions(-) diff --git a/xwiki-platform-core/xwiki-platform-search/xwiki-platform-search-solr/xwiki-platform-search-solr-api/src/main/java/org/xwiki/search/solr/internal/DefaultSolrIndexer.java b/xwiki-platform-core/xwiki-platform-search/xwiki-platform-search-solr/xwiki-platform-search-solr-api/src/main/java/org/xwiki/search/solr/internal/DefaultSolrIndexer.java index 9fe99cdffa44..c20d06c41c86 100644 --- a/xwiki-platform-core/xwiki-platform-search/xwiki-platform-search-solr/xwiki-platform-search-solr-api/src/main/java/org/xwiki/search/solr/internal/DefaultSolrIndexer.java +++ b/xwiki-platform-core/xwiki-platform-search/xwiki-platform-search-solr/xwiki-platform-search-solr-api/src/main/java/org/xwiki/search/solr/internal/DefaultSolrIndexer.java @@ -19,11 +19,14 @@ */ package org.xwiki.search.solr.internal; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; @@ -31,7 +34,6 @@ import javax.inject.Provider; import javax.inject.Singleton; -import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.common.SolrInputDocument; import org.slf4j.Logger; import org.xwiki.bridge.internal.DocumentContextExecutor; @@ -64,6 +66,7 @@ import org.xwiki.search.solr.internal.reference.SolrReferenceResolver; import org.xwiki.store.ReadyIndicator; +import com.google.common.util.concurrent.Uninterruptibles; import com.xpn.xwiki.XWikiContext; import com.xpn.xwiki.doc.XWikiDocument; import com.xpn.xwiki.util.AbstractXWikiRunnable; @@ -361,6 +364,16 @@ private ResolveQueueEntry getQueueEntry() */ private BlockingQueue resolveQueue; + /** + * The executor for Solr client operations. + */ + private ExecutorService solrClientExecutor; + + /** + * Future to wait on the last commit operation. + */ + private Future commitFuture; + /** * Thread in which the indexUpdater will be executed. */ @@ -393,6 +406,16 @@ private ResolveQueueEntry getQueueEntry() */ private volatile int batchSize; + /** + * The length of the not yet sent batch in characters. + */ + private int batchLength; + + /** + * The size of the batch that is currently being sent and committed. + */ + private volatile int committingBatchSize; + @Override public void initialize() throws InitializationException { @@ -400,6 +423,15 @@ public void initialize() throws InitializationException this.resolveQueue = new LinkedBlockingQueue<>(); this.indexQueue = new LinkedBlockingQueue<>(this.configuration.getIndexerQueueCapacity()); + // Launch the Solr client executor. + this.solrClientExecutor = Executors.newSingleThreadExecutor(r -> { + Thread thread = new Thread(r); + thread.setName("XWiki Solr client thread"); + thread.setDaemon(true); + thread.setPriority(Thread.NORM_PRIORITY - 1); + return thread; + }); + // Launch the resolve thread this.resolveThread = new Thread(new Resolver()); this.resolveThread.setName("XWiki Solr resolve thread"); @@ -482,20 +514,19 @@ public void run() */ private boolean processBatch(IndexQueueEntry queueEntry) { - int length = 0; - List solrDocuments = new ArrayList<>(); for (IndexQueueEntry batchEntry = queueEntry; batchEntry != null; batchEntry = this.indexQueue.poll()) { this.indexQueueRemovalCounter.incrementAndGet(); if (batchEntry == INDEX_QUEUE_ENTRY_STOP) { + // Handle the shutdown of the executor here to avoid that the executor is shut down before the batch + // processing finished. + this.solrClientExecutor.shutdown(); // Discard the current batch and stop the indexing thread. return false; } - IndexOperation operation = batchEntry.operation; - // For the current contiguous operations queue, group the changes try { ExecutionContext executionContext = new ExecutionContext(); @@ -503,12 +534,12 @@ private boolean processBatch(IndexQueueEntry queueEntry) XWikiContext xcontext = (XWikiContext) executionContext.getProperty(XWikiContext.EXECUTIONCONTEXT_KEY); xcontext.setUserReference(indexingUserConfig.getIndexingUserReference()); - switch (operation) { + switch (batchEntry.operation) { case INDEX: XWikiSolrInputDocument solrDocument = getSolrDocument(batchEntry.reference); if (solrDocument != null) { solrDocuments.add(solrDocument); - length += solrDocument.getLength(); + this.batchLength += solrDocument.getLength(); ++this.batchSize; } break; @@ -521,8 +552,10 @@ private boolean processBatch(IndexQueueEntry queueEntry) break; case READY_MARKER: commit(solrDocuments); - batchEntry.readyIndicator.complete(null); - length = 0; + // Add the completion of the ready indicator to the same queue to ensure that it is processed + // after the commit. + IndexQueueEntry finalBatchEntry = batchEntry; + this.solrClientExecutor.execute(() -> finalBatchEntry.readyIndicator.complete(null)); break; default: // Do nothing. @@ -535,9 +568,8 @@ private boolean processBatch(IndexQueueEntry queueEntry) // Commit the index changes so that they become available to queries. This is a costly operation and that is // the reason why we perform it at the end of the batch. - if (shouldCommit(length, this.batchSize)) { + if (shouldCommit(this.batchLength, this.batchSize)) { commit(solrDocuments); - length = 0; } } @@ -549,19 +581,34 @@ private boolean processBatch(IndexQueueEntry queueEntry) return true; } - private void applyDeletion(IndexQueueEntry queueEntry) throws SolrServerException, IOException, SolrIndexerException + private void applyDeletion(IndexQueueEntry queueEntry) { - if (queueEntry.reference == null) { - solrInstance.deleteByQuery(queueEntry.deleteQuery); - } else { - solrInstance.delete(this.solrRefereceResolver.getId(queueEntry.reference)); - } + this.solrClientExecutor.execute(() -> { + try { + if (queueEntry.reference == null) { + this.solrInstance.deleteByQuery(queueEntry.deleteQuery); + } else { + this.solrInstance.delete(this.solrRefereceResolver.getId(queueEntry.reference)); + } + } catch (Exception e) { + this.logger.error("Failed to delete document [{}] from the Solr server", queueEntry, e); + } + }); } - private void flushDocuments(List documents) throws SolrServerException, IOException + private void flushDocuments(List documents) { try { - this.solrInstance.add(documents); + // Copy the documents to flush to a new list so that we can clear the original list without affecting the + // documents that are being added to the Solr server. + List documentsToFlush = new ArrayList<>(documents); + this.solrClientExecutor.execute(() -> { + try { + this.solrInstance.add(documentsToFlush); + } catch (Exception e) { + this.logger.error("Failed to add documents to the Solr server", e); + } + }); } finally { // Clear the list of documents even when adding them failed to avoid leaking memory and to avoid // re-submitting the same documents to the Solr server multiple times. @@ -574,21 +621,38 @@ private void flushDocuments(List documents) throws SolrServer */ private void commit(List documents) { - try { - flushDocuments(documents); - solrInstance.commit(); - } catch (Exception e) { - this.logger.error("Failed to commit index changes to the Solr server. Rolling back.", e); - + flushDocuments(documents); + if (this.commitFuture != null) { try { - solrInstance.rollback(); - } catch (Exception ex) { - // Just log the failure. - this.logger.error("Failed to rollback index changes.", ex); + Uninterruptibles.getUninterruptibly(this.commitFuture); + } catch (ExecutionException e) { + this.logger.error("Failed to commit index changes to the Solr server", e); } } + // At this point, we can be sure that there is no more commit in the queue - so we can safely store the size + // of the next commit. + this.committingBatchSize = this.batchSize; + this.commitFuture = this.solrClientExecutor.submit(() -> { + try { + this.solrInstance.commit(); + } catch (Exception e) { + this.logger.error("Failed to commit index changes to the Solr server. Rolling back.", e); + + try { + this.solrInstance.rollback(); + } catch (Exception ex) { + // Just log the failure. + this.logger.error("Failed to rollback index changes.", ex); + } + } + + this.committingBatchSize = 0; + + return null; + }); this.batchSize = 0; + this.batchLength = 0; } /** @@ -693,7 +757,7 @@ private void addToQueue(EntityReference reference, boolean recurse, IndexOperati @Override public int getQueueSize() { - return this.indexQueue.size() + this.resolveQueue.size() + this.batchSize; + return this.indexQueue.size() + this.resolveQueue.size() + this.batchSize + this.committingBatchSize; } @Override