Skip to content

Commit d9c0ba8

Browse files
authored
add batch_bytes configuration for Flint (#329)
* add batch_bytes for FlintWriter Signed-off-by: Peng Huo <penghuo@gmail.com> * change DEFAULT_REFRESH_POLICY to false Signed-off-by: Peng Huo <penghuo@gmail.com> * Fix IT Signed-off-by: Peng Huo <penghuo@gmail.com> * update doc Signed-off-by: Peng Huo <penghuo@gmail.com> * address comments Signed-off-by: Peng Huo <penghuo@gmail.com> --------- Signed-off-by: Peng Huo <penghuo@gmail.com>
1 parent 20b761c commit d9c0ba8

File tree

11 files changed

+108
-22
lines changed

11 files changed

+108
-22
lines changed

docs/index.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -506,9 +506,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
506506
- `spark.datasource.flint.customAWSCredentialsProvider`: default is empty.
507507
- `spark.datasource.flint.write.id_name`: no default value.
508508
- `spark.datasource.flint.ignore.id_column` : default value is true.
509-
- `spark.datasource.flint.write.batch_size`: default value is 1000.
510-
- `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE
511-
(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
509+
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
510+
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
511+
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
512512
- `spark.datasource.flint.read.scroll_size`: default value is 100.
513513
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
514514
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.

flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import java.io.Serializable;
99
import java.util.Map;
10+
11+
import org.apache.spark.network.util.ByteUnit;
1012
import org.opensearch.flint.core.http.FlintRetryOptions;
1113

1214
/**
@@ -74,7 +76,7 @@ public class FlintOptions implements Serializable {
7476
*
7577
* WAIT_UNTIL("wait_for")
7678
*/
77-
public static final String DEFAULT_REFRESH_POLICY = "wait_for";
79+
public static final String DEFAULT_REFRESH_POLICY = "false";
7880

7981
public static final String SOCKET_TIMEOUT_MILLIS = "socket_timeout_millis";
8082

@@ -84,6 +86,10 @@ public class FlintOptions implements Serializable {
8486

8587
public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";
8688

89+
public static final String BATCH_BYTES = "write.batch_bytes";
90+
91+
public static final String DEFAULT_BATCH_BYTES = "1mb";
92+
8793
public FlintOptions(Map<String, String> options) {
8894
this.options = options;
8995
this.retryOptions = new FlintRetryOptions(options);
@@ -150,4 +156,10 @@ public String getDataSourceName() {
150156
public String getSystemIndexName() {
151157
return options.getOrDefault(SYSTEM_INDEX_KEY_NAME, "");
152158
}
159+
160+
public int getBatchBytes() {
161+
// we did not expect this value could be large than 10mb = 10 * 1024 * 1024
162+
return (int) org.apache.spark.network.util.JavaUtils
163+
.byteStringAs(options.getOrDefault(BATCH_BYTES, DEFAULT_BATCH_BYTES), ByteUnit.BYTE);
164+
}
153165
}

flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,10 @@ public FlintReader createReader(String indexName, String query) {
246246
}
247247

248248
public FlintWriter createWriter(String indexName) {
249-
LOG.info("Creating Flint index writer for " + indexName);
250-
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy());
249+
LOG.info(String.format("Creating Flint index writer for %s, refresh_policy:%s, " +
250+
"batch_bytes:%d", indexName, options.getRefreshPolicy(), options.getBatchBytes()));
251+
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName),
252+
options.getRefreshPolicy(), options.getBatchBytes());
251253
}
252254

253255
@Override

flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,9 @@ public abstract class FlintWriter extends Writer {
1919
* { "title": "Prisoners", "year": 2013 }
2020
*/
2121
public static final String ACTION_CREATE = "create";
22+
23+
/**
24+
* @return current data written into buffer in bytes.
25+
*/
26+
public abstract long getBufferSize();
2227
}

flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import org.opensearch.flint.core.IRestHighLevelClient;
1515
import org.opensearch.rest.RestStatus;
1616

17+
import java.io.ByteArrayOutputStream;
1718
import java.io.IOException;
19+
import java.nio.charset.StandardCharsets;
1820
import java.util.Arrays;
1921

2022
/**
@@ -27,19 +29,21 @@ public class OpenSearchWriter extends FlintWriter {
2729

2830
private final String refreshPolicy;
2931

30-
private StringBuilder sb;
32+
private final ByteArrayOutputStream baos;
3133

3234
private IRestHighLevelClient client;
3335

34-
public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy) {
36+
public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy,
37+
int bufferSizeInBytes) {
3538
this.client = client;
3639
this.indexName = indexName;
37-
this.sb = new StringBuilder();
3840
this.refreshPolicy = refreshPolicy;
41+
this.baos = new ByteArrayOutputStream(bufferSizeInBytes);
3942
}
4043

4144
@Override public void write(char[] cbuf, int off, int len) {
42-
sb.append(cbuf, off, len);
45+
byte[] bytes = new String(cbuf, off, len).getBytes(StandardCharsets.UTF_8);
46+
baos.write(bytes, 0, bytes.length);
4347
}
4448

4549
/**
@@ -48,8 +52,8 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re
4852
*/
4953
@Override public void flush() {
5054
try {
51-
if (sb.length() > 0) {
52-
byte[] bytes = sb.toString().getBytes();
55+
if (baos.size() > 0) {
56+
byte[] bytes = baos.toByteArray();
5357
BulkResponse
5458
response =
5559
client.bulk(
@@ -63,7 +67,7 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re
6367
} catch (IOException e) {
6468
throw new RuntimeException(String.format("Failed to execute bulk request on index: %s", indexName), e);
6569
} finally {
66-
sb.setLength(0);
70+
baos.reset();
6771
}
6872
}
6973

@@ -78,6 +82,10 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re
7882
}
7983
}
8084

85+
public long getBufferSize() {
86+
return baos.size();
87+
}
88+
8189
private boolean isCreateConflict(BulkItemResponse itemResp) {
8290
return itemResp.getOpType() == DocWriteRequest.OpType.CREATE && (itemResp.getFailure() == null || itemResp.getFailure()
8391
.getStatus() == RestStatus.CONFLICT);

flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ case class FlintPartitionWriter(
6060
gen.writeLineEnding()
6161

6262
docCount += 1
63-
if (docCount >= options.batchSize()) {
63+
if (docCount >= options.batchSize() || gen.getBufferSize >= options.batchBytes()) {
6464
gen.flush()
6565
docCount = 0
6666
}

flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import org.opensearch.flint.core.FlintOptions
1414
import org.opensearch.flint.core.http.FlintRetryOptions
1515

1616
import org.apache.spark.internal.config.ConfigReader
17+
import org.apache.spark.network.util.ByteUnit
1718
import org.apache.spark.sql.flint.config.FlintSparkConf._
1819
import org.apache.spark.sql.internal.SQLConf
1920

@@ -95,7 +96,14 @@ object FlintSparkConf {
9596
"The number of documents written to Flint in a single batch request is determined by the " +
9697
"overall size of the HTTP request, which should not exceed 100MB. The actual number of " +
9798
"documents will vary depending on the individual size of each document.")
98-
.createWithDefault("1000")
99+
.createWithDefault(Integer.MAX_VALUE.toString)
100+
101+
val BATCH_BYTES = FlintConfig(s"spark.datasource.flint.${FlintOptions.BATCH_BYTES}")
102+
.datasourceOption()
103+
.doc(
104+
"The approximately amount of data in bytes written to Flint in a single batch request. " +
105+
s"The actual data write to OpenSearch may more than it. Default value is 1mb")
106+
.createWithDefault(FlintOptions.DEFAULT_BATCH_BYTES)
99107

100108
val REFRESH_POLICY = FlintConfig("spark.datasource.flint.write.refresh_policy")
101109
.datasourceOption()
@@ -194,6 +202,9 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
194202

195203
def batchSize(): Int = BATCH_SIZE.readFrom(reader).toInt
196204

205+
def batchBytes(): Long = org.apache.spark.network.util.JavaUtils
206+
.byteStringAs(BATCH_BYTES.readFrom(reader), ByteUnit.BYTE)
207+
197208
def docIdColumnName(): Option[String] = DOC_ID_COLUMN_NAME.readFrom(reader)
198209

199210
def ignoreIdColumn(): Boolean = IGNORE_DOC_ID_COLUMN.readFrom(reader).toBoolean
@@ -237,7 +248,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
237248
PASSWORD,
238249
SOCKET_TIMEOUT_MILLIS,
239250
JOB_TYPE,
240-
REPL_INACTIVITY_TIMEOUT_MILLIS)
251+
REPL_INACTIVITY_TIMEOUT_MILLIS,
252+
BATCH_BYTES)
241253
.map(conf => (conf.optionKey, conf.readFrom(reader)))
242254
.toMap
243255

flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonGenerator.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@
55

66
package org.apache.spark.sql.flint.json
77

8-
import java.io.Writer
9-
108
import com.fasterxml.jackson.core.JsonFactory
119
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
10+
import org.opensearch.flint.core.storage.FlintWriter
1211

1312
import org.apache.spark.sql.catalyst.InternalRow
1413
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
@@ -24,7 +23,7 @@ import org.apache.spark.sql.types._
2423
*/
2524
case class FlintJacksonGenerator(
2625
dataType: DataType,
27-
writer: Writer,
26+
writer: FlintWriter,
2827
options: JSONOptions,
2928
ignoredFieldName: Option[String] = None) {
3029
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
@@ -314,4 +313,8 @@ case class FlintJacksonGenerator(
314313
})
315314
})
316315
}
316+
317+
def getBufferSize: Long = {
318+
writer.getBufferSize;
319+
}
317320
}

flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import java.util.Optional
99

1010
import scala.collection.JavaConverters._
1111

12-
import org.opensearch.flint.core.FlintOptions
1312
import org.opensearch.flint.core.http.FlintRetryOptions._
1413
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
1514

@@ -27,7 +26,7 @@ class FlintSparkConfSuite extends FlintSuite {
2726

2827
// default value
2928
assert(flintOptions.getPort == 9200)
30-
assert(flintOptions.getRefreshPolicy == "wait_for")
29+
assert(flintOptions.getRefreshPolicy == "false")
3130
}
3231
}
3332

@@ -75,6 +74,16 @@ class FlintSparkConfSuite extends FlintSuite {
7574
}
7675
}
7776

77+
test("test batch bytes options") {
78+
val defaultConf = FlintSparkConf(Map[String, String]().asJava)
79+
defaultConf.batchBytes() shouldBe 1024 * 1024
80+
defaultConf.flintOptions().getBatchBytes shouldBe 1024 * 1024
81+
82+
val overrideConf = FlintSparkConf(Map("write.batch_bytes" -> "4mb").asJava)
83+
overrideConf.batchBytes() shouldBe 4 * 1024 * 1024
84+
overrideConf.flintOptions().getBatchBytes shouldBe 4 * 1024 * 1024
85+
}
86+
7887
/**
7988
* Delete index `indexNames` after calling `f`.
8089
*/

integ-test/src/test/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ class FlintDataSourceV2ITSuite
216216
df.coalesce(1)
217217
.write
218218
.format("flint")
219-
.options(options + ("spark.flint.write.batch.size" -> s"$batchSize"))
219+
.options(options + ("spark.flint.write.batch_size" -> s"$batchSize"))
220220
.mode("overwrite")
221221
.save(indexName)
222222

@@ -499,6 +499,39 @@ class FlintDataSourceV2ITSuite
499499
}
500500
}
501501

502+
test("write dataframe to flint with batch bytes configuration") {
503+
val indexName = "tbatchbytes"
504+
val options = openSearchOptions + (s"${DOC_ID_COLUMN_NAME.optionKey}" -> "aInt")
505+
Seq("0b", "10b", "1mb").foreach(batchBytes => {
506+
withIndexName(indexName) {
507+
val mappings =
508+
"""{
509+
| "properties": {
510+
| "aInt": {
511+
| "type": "integer"
512+
| }
513+
| }
514+
|}""".stripMargin
515+
index(indexName, oneNodeSetting, mappings, Seq.empty)
516+
517+
val df = spark.range(15).toDF("aInt")
518+
df.coalesce(1)
519+
.write
520+
.format("flint")
521+
.options(options + ("spark.flint.write.batch_bytes" -> s"$batchBytes"))
522+
.mode("overwrite")
523+
.save(indexName)
524+
525+
checkAnswer(
526+
spark.sqlContext.read
527+
.format("flint")
528+
.options(openSearchOptions)
529+
.load(indexName),
530+
df)
531+
}
532+
})
533+
}
534+
502535
/**
503536
* Copy from SPARK JDBCV2Suite.
504537
*/

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import play.api.libs.json._
1717
import org.apache.spark.{SparkConf, SparkException}
1818
import org.apache.spark.internal.Logging
1919
import org.apache.spark.sql.catalyst.parser.ParseException
20+
import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY
2021
import org.apache.spark.sql.types._
2122
import org.apache.spark.sql.util._
2223

@@ -110,6 +111,7 @@ trait FlintJobExecutor {
110111
try {
111112
resultData.write
112113
.format("flint")
114+
.option(REFRESH_POLICY.optionKey, "wait_for")
113115
.mode("append")
114116
.save(resultIndex)
115117
IRestHighLevelClient.recordOperationSuccess(

0 commit comments

Comments
 (0)