Skip to content

Commit a987c60

Browse files
authored
Add initial HttpSink implementation (#4)
* Add initial HttpSink implementation * Add JavaDocs and update tests to JUnit 5 * Update numRecordsSendErrorsCounter
1 parent 79c9e50 commit a987c60

15 files changed

+804
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## [Unreleased]
44

5+
- Implement [HttpSink](src/main/java/com/getindata/connectors/http/sink/HttpSink.java) deriving from [AsyncSinkBase](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink) introduced in Flink 1.15.
6+
57
## [0.1.0] - 2022-05-26
68

79
- Implement baisc support for Http connector for Flink SQL

pom.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ under the License.
6565

6666
<properties>
6767
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
68+
69+
<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
70+
section, omitting the patch part (so for 1.15.0 use 1.15). -->
6871
<flink.version>1.15.0</flink.version>
72+
6973
<target.java.version>11</target.java.version>
7074
<scala.binary.version>2.12</scala.binary.version>
7175
<maven.compiler.source>${target.java.version}</maven.compiler.source>
@@ -139,6 +143,12 @@ under the License.
139143
<scope>provided</scope>
140144
</dependency>
141145

146+
<dependency>
147+
<groupId>org.apache.flink</groupId>
148+
<artifactId>flink-connector-base</artifactId>
149+
<version>${flink.version}</version>
150+
</dependency>
151+
142152
<!--===== Needed Dep ====-->
143153
<dependency>
144154
<groupId>com.jayway.jsonpath</groupId>
@@ -168,6 +178,14 @@ under the License.
168178
<scope>test</scope>
169179
</dependency>
170180

181+
<dependency>
182+
<groupId>org.apache.flink</groupId>
183+
<artifactId>flink-connector-base</artifactId>
184+
<version>${flink.version}</version>
185+
<type>test-jar</type>
186+
<scope>test</scope>
187+
</dependency>
188+
171189
<dependency>
172190
<groupId>org.apache.flink</groupId>
173191
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
@@ -436,6 +454,11 @@ under the License.
436454
<groupId>org.apache.maven.plugins</groupId>
437455
<artifactId>maven-javadoc-plugin</artifactId>
438456
<version>3.1.1</version>
457+
<configuration>
458+
<links>
459+
<link>https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/</link>
460+
</links>
461+
</configuration>
439462
<executions>
440463
<execution>
441464
<id>attach-javadocs</id>
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.getindata.connectors.http;
2+
3+
import com.getindata.connectors.http.sink.HttpSinkRequestEntry;
4+
5+
import java.util.List;
6+
import java.util.concurrent.CompletableFuture;
7+
8+
/**
9+
* An HTTP client that is used by {@link com.getindata.connectors.http.sink.HttpSinkWriter}
10+
* to send HTTP requests processed by {@link com.getindata.connectors.http.sink.HttpSink}.
11+
*/
12+
public interface SinkHttpClient {
13+
/**
14+
* Sends HTTP requests to an external web service.
15+
*
16+
* @param requestEntries a set of request entries that should be sent to the destination
17+
* @param endpointUrl the URL of the endpoint
18+
* @return the new {@link CompletableFuture} wrapping {@link SinkHttpClientResponse} that
19+
* completes when all requests have been sent and returned their statuses
20+
*/
21+
CompletableFuture<SinkHttpClientResponse> putRequests(List<HttpSinkRequestEntry> requestEntries, String endpointUrl);
22+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.getindata.connectors.http;
2+
3+
import java.io.Serializable;
4+
5+
/**
6+
* Builder building {@link SinkHttpClient}.
7+
*/
8+
public interface SinkHttpClientBuilder extends Serializable {
9+
SinkHttpClient build();
10+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.getindata.connectors.http;
2+
3+
import com.getindata.connectors.http.sink.HttpSinkRequestEntry;
4+
import lombok.Data;
5+
import lombok.NonNull;
6+
7+
import java.util.List;
8+
9+
/**
10+
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
11+
* to write, divided into two lists &mdash; successful and failed ones.
12+
*/
13+
@Data
14+
public class SinkHttpClientResponse {
15+
/**
16+
* A list of successfully written requests.
17+
*/
18+
@NonNull
19+
private final List<HttpSinkRequestEntry> successfulRequests;
20+
21+
/**
22+
* A list of requests that {@link SinkHttpClient} failed to write.
23+
*/
24+
@NonNull
25+
private final List<HttpSinkRequestEntry> failedRequests;
26+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package com.getindata.connectors.http.sink;
2+
3+
import com.getindata.connectors.http.SinkHttpClientBuilder;
4+
import org.apache.flink.connector.base.sink.AsyncSinkBase;
5+
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
6+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
7+
import org.apache.flink.core.io.SimpleVersionedSerializer;
8+
import org.apache.flink.util.Preconditions;
9+
import org.apache.flink.util.StringUtils;
10+
11+
import java.io.IOException;
12+
import java.util.Collection;
13+
import java.util.Collections;
14+
15+
/**
16+
* An HTTP Sink that performs async requests against a specified HTTP endpoint using the buffering
17+
* protocol specified in {@link AsyncSinkBase}.
18+
*
19+
* <p>The behaviour of the buffering may be specified by providing configuration during the sink build time.
20+
*
21+
* <ul>
22+
* <li>{@code maxBatchSize}: the maximum size of a batch of entries that may be sent to the HTTP
23+
* endpoint;</li>
24+
* <li>{@code maxInFlightRequests}: the maximum number of in flight requests that may exist, if
25+
* any more in flight requests need to be initiated once the maximum has been reached, then it
26+
* will be blocked until some have completed;</li>
27+
* <li>{@code maxBufferedRequests}: the maximum number of elements held in the buffer, requests to
28+
* add elements will be blocked while the number of elements in the buffer is at the
29+
* maximum;</li>
30+
* <li>{@code maxBatchSizeInBytes}: the maximum size of a batch of entries that may be sent to
31+
* the HTTP endpoint measured in bytes;</li>
32+
* <li>{@code maxTimeInBufferMS}: the maximum amount of time an entry is allowed to live in the
33+
* buffer, if any element reaches this age, the entire buffer will be flushed
34+
* immediately;</li>
35+
* <li>{@code maxRecordSizeInBytes}: the maximum size of a record the sink will accept into the
36+
* buffer, a record of size larger than this will be rejected when passed to the sink.</li>
37+
* </ul>
38+
*
39+
* @param <InputT> type of the elements that should be sent through HTTP request.
40+
*/
41+
public class HttpSink<InputT> extends AsyncSinkBase<InputT, HttpSinkRequestEntry> {
42+
private final String endpointUrl;
43+
44+
// having Builder instead of an instance of `SinkHttpClient` makes it possible to serialize `HttpSink`
45+
private final SinkHttpClientBuilder sinkHttpClientBuilder;
46+
47+
protected HttpSink(
48+
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
49+
int maxBatchSize,
50+
int maxInFlightRequests,
51+
int maxBufferedRequests,
52+
long maxBatchSizeInBytes,
53+
long maxTimeInBufferMS,
54+
long maxRecordSizeInBytes,
55+
String endpointUrl,
56+
SinkHttpClientBuilder sinkHttpClientBuilder
57+
) {
58+
super(elementConverter, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes,
59+
maxTimeInBufferMS, maxRecordSizeInBytes
60+
);
61+
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(endpointUrl), "The endpoint URL must be set when initializing HTTP Sink.");
62+
this.endpointUrl = endpointUrl;
63+
this.sinkHttpClientBuilder =
64+
Preconditions.checkNotNull(sinkHttpClientBuilder, "The HTTP client builder must not be null when initializing HTTP Sink.");
65+
}
66+
67+
/**
68+
* Create a {@link HttpSinkBuilder} constructing a new {@link HttpSink}.
69+
*
70+
* @param <InputT> type of the elements that should be sent through HTTP request
71+
* @return {@link HttpSinkBuilder}
72+
*/
73+
public static <InputT> HttpSinkBuilder<InputT> builder() {
74+
return new HttpSinkBuilder<>();
75+
}
76+
77+
@Override
78+
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> createWriter(
79+
InitContext context
80+
) throws IOException {
81+
return new HttpSinkWriter<>(
82+
getElementConverter(),
83+
context,
84+
getMaxBatchSize(),
85+
getMaxInFlightRequests(),
86+
getMaxBufferedRequests(),
87+
getMaxBatchSizeInBytes(),
88+
getMaxTimeInBufferMS(),
89+
getMaxRecordSizeInBytes(),
90+
endpointUrl,
91+
sinkHttpClientBuilder.build(),
92+
Collections.emptyList()
93+
);
94+
}
95+
96+
@Override
97+
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> restoreWriter(
98+
InitContext context, Collection<BufferedRequestState<HttpSinkRequestEntry>> recoveredState
99+
) throws IOException {
100+
return new HttpSinkWriter<>(
101+
getElementConverter(),
102+
context,
103+
getMaxBatchSize(),
104+
getMaxInFlightRequests(),
105+
getMaxBufferedRequests(),
106+
getMaxBatchSizeInBytes(),
107+
getMaxTimeInBufferMS(),
108+
getMaxRecordSizeInBytes(),
109+
endpointUrl,
110+
sinkHttpClientBuilder.build(),
111+
recoveredState
112+
);
113+
}
114+
115+
@Override
116+
public SimpleVersionedSerializer<BufferedRequestState<HttpSinkRequestEntry>> getWriterStateSerializer() {
117+
return new HttpSinkWriterStateSerializer();
118+
}
119+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package com.getindata.connectors.http.sink;
2+
3+
import com.getindata.connectors.http.SinkHttpClient;
4+
import com.getindata.connectors.http.SinkHttpClientBuilder;
5+
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
6+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
7+
8+
import java.util.Optional;
9+
10+
/**
11+
* Builder to construct {@link HttpSink}.
12+
*
13+
* <p>The following example shows the minimum setup to create a {@link HttpSink} that writes String
14+
* values to an HTTP endpoint using POST method.
15+
*
16+
* <pre>{@code
17+
* HttpSink<String> httpSink =
18+
* HttpSink.<String>builder()
19+
* .setEndpointUrl("http://example.com/myendpoint")
20+
* .setElementConverter(
21+
* (s, _context) -> new HttpSinkRequestEntry("POST", "text/plain", s.getBytes(StandardCharsets.UTF_8)))
22+
* .build();
23+
* }</pre>
24+
*
25+
* <p>If the following parameters are not set in this builder, the following defaults will be used:
26+
* <ul>
27+
* <li>{@code maxBatchSize} will be 500,</li>
28+
* <li>{@code maxInFlightRequests} will be 50,</li>
29+
* <li>{@code maxBufferedRequests} will be 10000,</li>
30+
* <li>{@code maxBatchSizeInBytes} will be 5 MB i.e. {@code 5 * 1024 * 1024},</li>
31+
* <li>{@code maxTimeInBufferMS} will be 5000ms,</li>
32+
* <li>{@code maxRecordSizeInBytes} will be 1 MB i.e. {@code 1024 * 1024}.</li>
33+
* </ul>
34+
* {@code endpointUrl} and {@code elementConverter} must be set by the user.
35+
*
36+
* @param <InputT> type of the elements that should be sent through HTTP request.
37+
*/
38+
public class HttpSinkBuilder<InputT> extends
39+
AsyncSinkBaseBuilder<InputT, HttpSinkRequestEntry, HttpSinkBuilder<InputT>> {
40+
private static final int DEFAULT_MAX_BATCH_SIZE = 500;
41+
private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
42+
private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10_000;
43+
private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 5 * 1024 * 1024;
44+
private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
45+
private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1024 * 1024;
46+
47+
private String endpointUrl;
48+
private SinkHttpClientBuilder sinkHttpClientBuilder;
49+
private ElementConverter<InputT, HttpSinkRequestEntry> elementConverter;
50+
51+
HttpSinkBuilder() {}
52+
53+
/**
54+
* @param endpointUrl the URL of the endpoint
55+
* @return {@link HttpSinkBuilder} itself
56+
*/
57+
public HttpSinkBuilder<InputT> setEndpointUrl(String endpointUrl) {
58+
this.endpointUrl = endpointUrl;
59+
return this;
60+
}
61+
62+
/**
63+
* @param sinkHttpClientBuilder builder for an implementation of {@link SinkHttpClient} that will be used by {@link HttpSink}
64+
* @return {@link HttpSinkBuilder} itself
65+
*/
66+
public HttpSinkBuilder<InputT> setSinkHttpClientBuilder(SinkHttpClientBuilder sinkHttpClientBuilder) {
67+
this.sinkHttpClientBuilder = sinkHttpClientBuilder;
68+
return this;
69+
}
70+
71+
/**
72+
* @param elementConverter the {@link ElementConverter} to be used for the sink
73+
* @return {@link HttpSinkBuilder} itself
74+
*/
75+
public HttpSinkBuilder<InputT> setElementConverter(ElementConverter<InputT, HttpSinkRequestEntry> elementConverter) {
76+
this.elementConverter = elementConverter;
77+
return this;
78+
}
79+
80+
@Override
81+
public HttpSink<InputT> build() {
82+
return new HttpSink<>(
83+
elementConverter,
84+
Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
85+
Optional.ofNullable(getMaxInFlightRequests()).orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
86+
Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
87+
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
88+
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
89+
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
90+
endpointUrl,
91+
sinkHttpClientBuilder
92+
);
93+
}
94+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.getindata.connectors.http.sink;
2+
3+
import lombok.EqualsAndHashCode;
4+
import lombok.NonNull;
5+
import lombok.RequiredArgsConstructor;
6+
7+
import java.io.Serializable;
8+
9+
/**
10+
* Represents a single {@link HttpSink} request. Contains the HTTP method name, Content-Type header
11+
* value, and byte representation of the body of the request.
12+
*/
13+
@RequiredArgsConstructor
14+
@EqualsAndHashCode
15+
public final class HttpSinkRequestEntry implements Serializable {
16+
/**
17+
* HTTP method name to use when sending the request.
18+
*/
19+
@NonNull
20+
public final String method;
21+
22+
/**
23+
* Value of the Content-Type header, e.g. <i>application/json</i>.
24+
*/
25+
@NonNull
26+
public final String contentType;
27+
28+
/**
29+
* Body of the request, encoded as byte array.
30+
*/
31+
public final byte[] element;
32+
33+
/**
34+
* @return the size of the {@link HttpSinkRequestEntry#element}
35+
*/
36+
public long getSizeInBytes() {
37+
return element.length;
38+
}
39+
}

0 commit comments

Comments
 (0)