Skip to content

Commit 530ff66

Browse files
authored
Merge pull request #8 from getindata/Package_refactoring
Package_refactoring - add internal package and expose public API
2 parents 78220e5 + 3fbeb3e commit 530ff66

File tree

51 files changed

+301
-213
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+301
-213
lines changed

CHANGELOG.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@
22

33
## [Unreleased]
44

5+
- Package refactoring. Hide internal classes that does not have to be used by API users under "internal" package.
6+
Methods defined in classes located outside "internal" package are considered "public API".
7+
Any changes to those methods should be communicated as "not backward compatible" and should be avoided.
8+
59
## [0.2.0] - 2022-07-06
610

7-
- Implement [HttpSink](src/main/java/com/getindata/connectors/http/sink/HttpSink.java) deriving from [AsyncSinkBase](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink) introduced in Flink 1.15.
8-
- Add support for Table API in HttpSink in the form of [HttpDynamicSink](src/main/java/com/getindata/connectors/http/table/HttpDynamicSink.java).
11+
- Implement [HttpSink](src/main/java/com/getindata/connectors/http/HttpSink.java) deriving from [AsyncSinkBase](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink) introduced in Flink 1.15.
12+
- Add support for Table API in HttpSink in the form of [HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java).
913

1014
## [0.1.0] - 2022-05-26
1115

12-
- Implement basic support for Http connector for Flink SQL
16+
- Implement basic support for Http connector for Flink SQL
1317

1418
[Unreleased]: https://github.com/getindata/flink-http-connector/compare/0.2.0...HEAD
1519

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ The goal for HTTP TableLookup connector was to use it in Flink SQL statement as
77
Currently, HTTP TableLookup connector supports only Lookup Joins [1] and expects JSON as a response body. It also supports only the STRING types.
88

99
#### HTTP Sink
10-
`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/table/HttpDynamicTableSinkFactory.java)).
10+
`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)).
1111

1212
## Prerequisites
1313
* Java 11
@@ -53,7 +53,7 @@ http://localhost:8080/client/service?id=1&uuid=2
5353
``
5454

5555
### HTTP Sink
56-
The following example shows the minimum Table API example to create a [HttpDynamicSink](src/main/java/com/getindata/connectors/http/table/HttpDynamicSink.java) that writes JSON values to an HTTP endpoint using POST method, assuming Flink has JAR of [JSON serializer](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/) installed:
56+
The following example shows the minimum Table API example to create a [HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicSink.java) that writes JSON values to an HTTP endpoint using POST method, assuming Flink has JAR of [JSON serializer](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/) installed:
5757

5858
```roomsql
5959
CREATE TABLE http (
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.getindata.connectors.http;
2+
3+
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
4+
import com.getindata.connectors.http.internal.sink.HttpSinkInternal;
5+
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
6+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
7+
8+
/**
9+
* A public implementation for {@code HttpSink} that performs async requests against a specified
10+
* HTTP endpoint using the buffering protocol specified in
11+
* {@link org.apache.flink.connector.base.sink.AsyncSinkBase}.
12+
*
13+
* <p>
14+
* To create a new instance of this class use {@link HttpSinkBuilder}. An example would be:
15+
* <pre>
16+
* HttpSink<String> httpSink =
17+
* HttpSink.<String>builder()
18+
* .setEndpointUrl("http://example.com/myendpoint")
19+
* .setElementConverter(
20+
* (s, _context) -> new HttpSinkRequestEntry("POST", "text/plain", s.getBytes(StandardCharsets.UTF_8)))
21+
* .build();
22+
* </pre>
23+
*
24+
* @param <InputT> type of the elements that should be sent through HTTP request.
25+
*/
26+
public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
27+
28+
HttpSink(
29+
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
30+
int maxBatchSize,
31+
int maxInFlightRequests,
32+
int maxBufferedRequests,
33+
long maxBatchSizeInBytes,
34+
long maxTimeInBufferMS,
35+
long maxRecordSizeInBytes,
36+
String endpointUrl,
37+
SinkHttpClientBuilder sinkHttpClientBuilder) {
38+
super(elementConverter,
39+
maxBatchSize,
40+
maxInFlightRequests,
41+
maxBufferedRequests,
42+
maxBatchSizeInBytes,
43+
maxTimeInBufferMS,
44+
maxRecordSizeInBytes,
45+
endpointUrl,
46+
sinkHttpClientBuilder
47+
);
48+
}
49+
50+
/**
51+
* Create a {@link HttpSinkBuilder} constructing a new {@link HttpSink}.
52+
*
53+
* @param <InputT> type of the elements that should be sent through HTTP request
54+
* @return {@link HttpSinkBuilder}
55+
*/
56+
public static <InputT> HttpSinkBuilder<InputT> builder() {
57+
return new HttpSinkBuilder<>();
58+
}
59+
}

src/main/java/com/getindata/connectors/http/sink/HttpSinkBuilder.java renamed to src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
package com.getindata.connectors.http.sink;
1+
package com.getindata.connectors.http;
22

3-
import com.getindata.connectors.http.SinkHttpClient;
4-
import com.getindata.connectors.http.SinkHttpClientBuilder;
3+
import com.getindata.connectors.http.internal.SinkHttpClient;
4+
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
5+
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
56
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
67
import org.apache.flink.connector.base.sink.writer.ElementConverter;
78

@@ -13,14 +14,14 @@
1314
* <p>The following example shows the minimum setup to create a {@link HttpSink} that writes String
1415
* values to an HTTP endpoint using POST method.
1516
*
16-
* <pre>{@code
17+
* <pre>
1718
* HttpSink<String> httpSink =
1819
* HttpSink.<String>builder()
1920
* .setEndpointUrl("http://example.com/myendpoint")
2021
* .setElementConverter(
2122
* (s, _context) -> new HttpSinkRequestEntry("POST", "text/plain", s.getBytes(StandardCharsets.UTF_8)))
2223
* .build();
23-
* }</pre>
24+
* </pre>
2425
*
2526
* <p>If the following parameters are not set in this builder, the following defaults will be used:
2627
* <ul>
@@ -60,7 +61,8 @@ public HttpSinkBuilder<InputT> setEndpointUrl(String endpointUrl) {
6061
}
6162

6263
/**
63-
* @param sinkHttpClientBuilder builder for an implementation of {@link SinkHttpClient} that will be used by {@link HttpSink}
64+
* @param sinkHttpClientBuilder builder for an implementation of {@link SinkHttpClient} that
65+
* will be used by {@link HttpSink}
6466
* @return {@link HttpSinkBuilder} itself
6567
*/
6668
public HttpSinkBuilder<InputT> setSinkHttpClientBuilder(SinkHttpClientBuilder sinkHttpClientBuilder) {

src/main/java/com/getindata/connectors/http/PollingClient.java

Lines changed: 0 additions & 9 deletions
This file was deleted.

src/main/java/com/getindata/connectors/http/HttpResultConverter.java renamed to src/main/java/com/getindata/connectors/http/internal/HttpResultConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http;
1+
package com.getindata.connectors.http.internal;
22

33
import java.io.Serializable;
44
import org.apache.flink.table.data.RowData;

src/main/java/com/getindata/connectors/http/JsonResultTableConverter.java renamed to src/main/java/com/getindata/connectors/http/internal/JsonResultTableConverter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
package com.getindata.connectors.http;
1+
package com.getindata.connectors.http.internal;
22

3-
import static com.getindata.connectors.http.table.TableSourceHelper.buildEmptyRow;
4-
import static com.getindata.connectors.http.table.TableSourceHelper.buildGenericRowData;
3+
import static com.getindata.connectors.http.internal.table.lookup.TableSourceHelper.buildEmptyRow;
4+
import static com.getindata.connectors.http.internal.table.lookup.TableSourceHelper.buildGenericRowData;
55
import static org.apache.commons.lang3.StringUtils.isNotBlank;
66

7-
import com.getindata.connectors.http.table.TableSourceHelper;
87
import com.jayway.jsonpath.DocumentContext;
98
import com.jayway.jsonpath.JsonPath;
109
import com.jayway.jsonpath.PathNotFoundException;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.getindata.connectors.http.internal;
2+
3+
import com.getindata.connectors.http.internal.table.lookup.LookupArg;
4+
import java.util.List;
5+
6+
public interface PollingClient<T> {
7+
8+
T pull(List<LookupArg> params);
9+
}

src/main/java/com/getindata/connectors/http/PollingClientFactory.java renamed to src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
package com.getindata.connectors.http;
1+
package com.getindata.connectors.http.internal;
22

3-
import com.getindata.connectors.http.table.HttpLookupConfig;
3+
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
44
import org.apache.flink.api.connector.source.SourceReaderContext;
55
import org.apache.flink.table.functions.FunctionContext;
66

src/main/java/com/getindata/connectors/http/SinkHttpClient.java renamed to src/main/java/com/getindata/connectors/http/internal/SinkHttpClient.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
package com.getindata.connectors.http;
1+
package com.getindata.connectors.http.internal;
22

3-
import com.getindata.connectors.http.sink.HttpSinkRequestEntry;
3+
import com.getindata.connectors.http.internal.sink.HttpSinkWriter;
4+
import com.getindata.connectors.http.internal.sink.HttpSinkInternal;
5+
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
46

57
import java.util.List;
68
import java.util.concurrent.CompletableFuture;
79

810
/**
9-
* An HTTP client that is used by {@link com.getindata.connectors.http.sink.HttpSinkWriter}
10-
* to send HTTP requests processed by {@link com.getindata.connectors.http.sink.HttpSink}.
11+
* An HTTP client that is used by {@link HttpSinkWriter}
12+
* to send HTTP requests processed by {@link HttpSinkInternal}.
1113
*/
1214
public interface SinkHttpClient {
1315
/**

0 commit comments

Comments
 (0)