Skip to content

Commit d18982c

Browse files
Support multi-threaded writes in pull based ingestion (#17912)
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent f211d34 commit d18982c

File tree

17 files changed

+608
-134
lines changed

17 files changed

+608
-134
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 3.x]
77
### Added
8+
- Add multi-threaded writer support in pull-based ingestion ([#17912](https://github.com/opensearch-project/OpenSearch/pull/17912))
89

910
### Changed
1011

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,4 +200,31 @@ public void testUpdateWithoutIDField() throws Exception {
200200
return 30 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
201201
});
202202
}
203+
204+
public void testMultiThreadedWrites() throws Exception {
205+
// create index with 5 writer threads
206+
createIndexWithDefaultSettings(indexName, 1, 0, 5);
207+
ensureGreen(indexName);
208+
209+
// Step 1: Produce messages
210+
for (int i = 0; i < 1000; i++) {
211+
produceData(Integer.toString(i), "name" + i, "25");
212+
}
213+
214+
waitForState(() -> {
215+
SearchResponse searchableDocsResponse = client().prepareSearch(indexName).setSize(2000).setPreference("_only_local").get();
216+
return searchableDocsResponse.getHits().getTotalHits().value() == 1000;
217+
});
218+
219+
// Step 2: Produce an update message and validate
220+
for (int i = 0; i < 1000; i++) {
221+
produceData(Integer.toString(i), "name" + i, "30");
222+
}
223+
224+
waitForState(() -> {
225+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(28);
226+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
227+
return response.getHits().getTotalHits().value() == 1000;
228+
});
229+
}
203230
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,10 @@ protected ResumeIngestionResponse resumeIngestion(String indexName) throws Execu
177177
}
178178

179179
protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
180-
createIndexWithDefaultSettings(indexName, numShards, numReplicas);
180+
createIndexWithDefaultSettings(indexName, numShards, numReplicas, 1);
181181
}
182182

183-
protected void createIndexWithDefaultSettings(String indexName, int numShards, int numReplicas) {
183+
protected void createIndexWithDefaultSettings(String indexName, int numShards, int numReplicas, int numProcessorThreads) {
184184
createIndex(
185185
indexName,
186186
Settings.builder()
@@ -191,6 +191,7 @@ protected void createIndexWithDefaultSettings(String indexName, int numShards, i
191191
.put("ingestion_source.param.topic", topicName)
192192
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
193193
.put("index.replication.type", "SEGMENT")
194+
.put("ingestion_source.num_processor_threads", numProcessorThreads)
194195
// set custom kafka consumer properties
195196
.put("ingestion_source.param.fetch.min.bytes", 30000)
196197
.put("ingestion_source.param.enable.auto.commit", false)

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ public void testErrorStrategy() throws Exception {
164164
.setSettings(Settings.builder().put("ingestion_source.error_strategy", "drop"))
165165
.get();
166166
waitForState(() -> "drop".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy")));
167+
resumeIngestion(indexName);
167168
waitForSearchableDocs(2, Arrays.asList(node));
168169
}
169170

@@ -248,8 +249,8 @@ public void testPaginatedGetIngestionState() throws ExecutionException, Interrup
248249
internalCluster().startClusterManagerOnlyNode();
249250
internalCluster().startDataOnlyNode();
250251
internalCluster().startDataOnlyNode();
251-
createIndexWithDefaultSettings("index1", 5, 0);
252-
createIndexWithDefaultSettings("index2", 5, 0);
252+
createIndexWithDefaultSettings("index1", 5, 0, 1);
253+
createIndexWithDefaultSettings("index2", 5, 0, 1);
253254
ensureGreen("index1");
254255
ensureGreen("index2");
255256

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,18 @@ public Iterator<Setting<?>> settings() {
831831
Property.Dynamic
832832
);
833833

834+
/**
835+
* Defines the number of processor threads that will write to the lucene index.
836+
*/
837+
public static final String SETTING_INGESTION_SOURCE_NUM_PROCESSOR_THREADS = "index.ingestion_source.num_processor_threads";
838+
public static final Setting<Integer> INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING = Setting.intSetting(
839+
SETTING_INGESTION_SOURCE_NUM_PROCESSOR_THREADS,
840+
1,
841+
1,
842+
Setting.Property.IndexScope,
843+
Setting.Property.Final
844+
);
845+
834846
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
835847
"index.ingestion_source.param.",
836848
key -> new Setting<>(key, "", (value) -> {
@@ -1073,11 +1085,14 @@ public IngestionSource getIngestionSource() {
10731085
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
10741086
final long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.get(settings);
10751087
final int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.get(settings);
1088+
final int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.get(settings);
1089+
10761090
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
10771091
.setPointerInitReset(pointerInitReset)
10781092
.setErrorStrategy(errorStrategy)
10791093
.setMaxPollSize(maxPollSize)
10801094
.setPollTimeout(pollTimeout)
1095+
.setNumProcessorThreads(numProcessorThreads)
10811096
.build();
10821097
}
10831098
return null;

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Objects;
1919

2020
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE;
21+
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING;
2122
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT;
2223

2324
/**
@@ -31,21 +32,24 @@ public class IngestionSource {
3132
private final Map<String, Object> params;
3233
private final long maxPollSize;
3334
private final int pollTimeout;
35+
private int numProcessorThreads;
3436

3537
private IngestionSource(
3638
String type,
3739
PointerInitReset pointerInitReset,
3840
IngestionErrorStrategy.ErrorStrategy errorStrategy,
3941
Map<String, Object> params,
4042
long maxPollSize,
41-
int pollTimeout
43+
int pollTimeout,
44+
int numProcessorThreads
4245
) {
4346
this.type = type;
4447
this.pointerInitReset = pointerInitReset;
4548
this.params = params;
4649
this.errorStrategy = errorStrategy;
4750
this.maxPollSize = maxPollSize;
4851
this.pollTimeout = pollTimeout;
52+
this.numProcessorThreads = numProcessorThreads;
4953
}
5054

5155
public String getType() {
@@ -72,6 +76,10 @@ public int getPollTimeout() {
7276
return pollTimeout;
7377
}
7478

79+
public int getNumProcessorThreads() {
80+
return numProcessorThreads;
81+
}
82+
7583
@Override
7684
public boolean equals(Object o) {
7785
if (this == o) return true;
@@ -82,12 +90,13 @@ public boolean equals(Object o) {
8290
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
8391
&& Objects.equals(params, ingestionSource.params)
8492
&& Objects.equals(maxPollSize, ingestionSource.maxPollSize)
85-
&& Objects.equals(pollTimeout, ingestionSource.pollTimeout);
93+
&& Objects.equals(pollTimeout, ingestionSource.pollTimeout)
94+
&& Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads);
8695
}
8796

8897
@Override
8998
public int hashCode() {
90-
return Objects.hash(type, pointerInitReset, params, errorStrategy, maxPollSize, pollTimeout);
99+
return Objects.hash(type, pointerInitReset, params, errorStrategy, maxPollSize, pollTimeout, numProcessorThreads);
91100
}
92101

93102
@Override
@@ -108,6 +117,8 @@ public String toString() {
108117
+ maxPollSize
109118
+ ", pollTimeout="
110119
+ pollTimeout
120+
+ ", numProcessorThreads="
121+
+ numProcessorThreads
111122
+ '}';
112123
}
113124

@@ -163,6 +174,7 @@ public static class Builder {
163174
private Map<String, Object> params;
164175
private long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.getDefault(Settings.EMPTY);
165176
private int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.getDefault(Settings.EMPTY);
177+
private int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.getDefault(Settings.EMPTY);
166178

167179
public Builder(String type) {
168180
this.type = type;
@@ -206,8 +218,13 @@ public Builder setPollTimeout(int pollTimeout) {
206218
return this;
207219
}
208220

221+
public Builder setNumProcessorThreads(int numProcessorThreads) {
222+
this.numProcessorThreads = numProcessorThreads;
223+
return this;
224+
}
225+
209226
public IngestionSource build() {
210-
return new IngestionSource(type, pointerInitReset, errorStrategy, params, maxPollSize, pollTimeout);
227+
return new IngestionSource(type, pointerInitReset, errorStrategy, params, maxPollSize, pollTimeout, numProcessorThreads);
211228
}
212229

213230
}

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
269269
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING,
270270
IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING,
271271
IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING,
272+
IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE,
273+
IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT,
274+
IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING,
272275

273276
// Settings for search replica
274277
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ public void start() {
128128
ingestionErrorStrategy,
129129
initialPollerState,
130130
ingestionSource.getMaxPollSize(),
131-
ingestionSource.getPollTimeout()
131+
ingestionSource.getPollTimeout(),
132+
ingestionSource.getNumProcessorThreads()
132133
);
133134
streamPoller.start();
134135
}

0 commit comments

Comments
 (0)