Skip to content

Commit 1c7c57e

Browse files
kristoffSCKrzysztof Chmielewski
authored andcommitted
ESP-98ESP-98_SinkParameters - changes after code review
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
1 parent b30e090 commit 1c7c57e

File tree

9 files changed

+38
-89
lines changed

9 files changed

+38
-89
lines changed

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,4 @@ public final class HttpConnectorConfigConstants {
2121
*/
2222
public static final String SINK_HEADER_PREFIX = GID_CONNECTOR_HTTP + "sink.header.";
2323

24-
/**
25-
* A property for Content-Type HTTP header.
26-
*/
27-
public static final String CONTENT_TYPE_HEADER = SINK_HEADER_PREFIX + "Content-Type";
28-
2924
}

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public JavaNetSinkHttpClient(Properties properties) {
4343
Map<String, String> headerMap =
4444
ConfigUtils.propertiesToMap(properties, SINK_HEADER_PREFIX, String.class);
4545

46-
headersAndValues = ConfigUtils.flatMapToHeaderArray(headerMap);
46+
this.headersAndValues = ConfigUtils.toHeaderAndValueArray(headerMap);
4747
}
4848

4949
@Override

src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.getindata.connectors.http.internal.table.sink;
22

3-
import java.util.HashMap;
4-
import java.util.Map;
53
import java.util.Properties;
64
import javax.annotation.Nullable;
75

@@ -16,15 +14,13 @@
1614
import org.apache.flink.table.connector.sink.DynamicTableSink;
1715
import org.apache.flink.table.connector.sink.SinkV2Provider;
1816
import org.apache.flink.table.data.RowData;
19-
import org.apache.flink.table.factories.FactoryUtil;
2017
import org.apache.flink.table.types.DataType;
2118
import org.apache.flink.util.Preconditions;
2219

2320
import com.getindata.connectors.http.HttpSink;
2421
import com.getindata.connectors.http.HttpSinkBuilder;
2522
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
2623
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
27-
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.CONTENT_TYPE_HEADER;
2824
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.INSERT_METHOD;
2925
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.URL;
3026

@@ -81,8 +77,6 @@ public class HttpDynamicSink extends AsyncDynamicTableSink<HttpSinkRequestEntry>
8177

8278
private final ReadableConfig tableOptions;
8379

84-
private final Map<String, String> formatContentTypeMap;
85-
8680
private final Properties properties;
8781

8882
protected HttpDynamicSink(
@@ -93,7 +87,6 @@ protected HttpDynamicSink(
9387
@Nullable Long maxTimeInBufferMS,
9488
DataType consumedDataType,
9589
EncodingFormat<SerializationSchema<RowData>> encodingFormat,
96-
Map<String, String> formatContentTypeMap,
9790
ReadableConfig tableOptions,
9891
Properties properties
9992
) {
@@ -103,8 +96,6 @@ protected HttpDynamicSink(
10396
Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null");
10497
this.encodingFormat =
10598
Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null");
106-
this.formatContentTypeMap = Preconditions.checkNotNull(formatContentTypeMap,
107-
"Format to content type map must not be null");
10899
this.tableOptions =
109100
Preconditions.checkNotNull(tableOptions, "Table options must not be null");
110101
this.properties = properties;
@@ -121,7 +112,6 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
121112
encodingFormat.createRuntimeEncoder(context, consumedDataType);
122113

123114
var insertMethod = tableOptions.get(INSERT_METHOD);
124-
var contentType = getContentTypeFromFormat(tableOptions.get(FactoryUtil.FORMAT));
125115

126116
// TODO ESP-98 add headers to DDL and add tests for this
127117
HttpSinkBuilder<RowData> builder = HttpSink
@@ -132,7 +122,6 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
132122
insertMethod,
133123
serializationSchema.serialize(rowData)
134124
))
135-
.setProperty(CONTENT_TYPE_HEADER, contentType)
136125
.setProperties(properties);
137126
addAsyncOptionsToSinkBuilder(builder);
138127

@@ -149,7 +138,6 @@ public DynamicTableSink copy() {
149138
maxTimeInBufferMS,
150139
consumedDataType,
151140
encodingFormat,
152-
new HashMap<>(formatContentTypeMap),
153141
tableOptions,
154142
properties
155143
);
@@ -160,18 +148,6 @@ public String asSummaryString() {
160148
return "HttpSink";
161149
}
162150

163-
private String getContentTypeFromFormat(String format) {
164-
var contentType = formatContentTypeMap.get(format);
165-
if (contentType != null) {
166-
return contentType;
167-
}
168-
169-
log.warn(
170-
"Unexpected format {}. MIME type for the request will be set to \"application/{}\".",
171-
format, format);
172-
return "application/" + format;
173-
}
174-
175151
/**
176152
* Builder to construct {@link HttpDynamicSink}.
177153
*/
@@ -182,8 +158,6 @@ public static class HttpDynamicTableSinkBuilder
182158

183159
private ReadableConfig tableOptions;
184160

185-
private Map<String, String> formatContentTypeMap;
186-
187161
private DataType consumedDataType;
188162

189163
private EncodingFormat<SerializationSchema<RowData>> encodingFormat;
@@ -198,12 +172,6 @@ public HttpDynamicTableSinkBuilder setTableOptions(ReadableConfig tableOptions)
198172
return this;
199173
}
200174

201-
public HttpDynamicTableSinkBuilder setFormatContentTypeMap(
202-
Map<String, String> formatContentTypeMap) {
203-
this.formatContentTypeMap = formatContentTypeMap;
204-
return this;
205-
}
206-
207175
/**
208176
* @param encodingFormat the format for encoding records
209177
* @return {@link HttpDynamicTableSinkBuilder} itself
@@ -252,7 +220,6 @@ public HttpDynamicSink build() {
252220
getMaxTimeInBufferMS(),
253221
consumedDataType,
254222
encodingFormat,
255-
formatContentTypeMap,
256223
tableOptions,
257224
properties
258225
);

src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.getindata.connectors.http.internal.table.sink;
22

3-
import java.util.Map;
43
import java.util.Properties;
54
import java.util.Set;
65

@@ -23,11 +22,6 @@ public class HttpDynamicTableSinkFactory extends AsyncDynamicTableSinkFactory {
2322

2423
public static final String IDENTIFIER = "http-sink";
2524

26-
private static final Map<String, String> FORMAT_CONTENT_TYPE_MAP = Map.ofEntries(
27-
Map.entry("raw", "application/octet-stream"),
28-
Map.entry("json", "application/json")
29-
);
30-
3125
@Override
3226
public DynamicTableSink createDynamicTableSink(Context context) {
3327
final AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context);
@@ -47,7 +41,6 @@ public DynamicTableSink createDynamicTableSink(Context context) {
4741
new HttpDynamicSink.HttpDynamicTableSinkBuilder()
4842
.setTableOptions(tableOptions)
4943
.setEncodingFormat(factoryContext.getEncodingFormat())
50-
.setFormatContentTypeMap(FORMAT_CONTENT_TYPE_MAP)
5144
.setConsumedDataType(factoryContext.getPhysicalDataType())
5245
.setProperties(httpConnectorProperties);
5346
addAsyncOptionsToBuilder(asyncSinkProperties, builder);

src/main/java/com/getindata/connectors/http/internal/utils/ConfigUtils.java

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -53,31 +53,31 @@ public static <T> Map<String, T> propertiesToMap(
5353
* be in format as <b>{@code this.is.my.property.name}</b>, using "dot" as a delimiter. For this
5454
* example the returned value would be <b>{@code name}</b>.
5555
*
56-
* @param propertyKay Property name to extract the last element from.
56+
* @param propertyKey Property name to extract the last element from.
5757
* @return property last element or the property name if {@code propertyKey} parameter had no
5858
* dot delimiter.
5959
* @throws ConfigException when invalid property such as null, empty, blank, ended with dot was
6060
* used.
6161
*/
62-
public static String extractPropertyLastElement(String propertyKay) {
63-
if (StringUtils.isNullOrWhitespaceOnly(propertyKay)) {
62+
public static String extractPropertyLastElement(String propertyKey) {
63+
if (StringUtils.isNullOrWhitespaceOnly(propertyKey)) {
6464
throw new ConfigException("Provided a property name that is null, empty or blank.");
6565
}
6666

67-
if (!propertyKay.contains(PROPERTY_NAME_DELIMITER)) {
68-
return propertyKay;
67+
if (!propertyKey.contains(PROPERTY_NAME_DELIMITER)) {
68+
return propertyKey;
6969
}
7070

71-
int delimiterLastIndex = propertyKay.lastIndexOf(PROPERTY_NAME_DELIMITER);
72-
if (delimiterLastIndex == propertyKay.length() - 1) {
71+
int delimiterLastIndex = propertyKey.lastIndexOf(PROPERTY_NAME_DELIMITER);
72+
if (delimiterLastIndex == propertyKey.length() - 1) {
7373
throw new ConfigException(
7474
String.format(
7575
"Invalid property - %s. Property name should not end with property delimiter.",
76-
propertyKay)
76+
propertyKey)
7777
);
7878
}
7979

80-
return propertyKay.substring(delimiterLastIndex + 1);
80+
return propertyKey.substring(delimiterLastIndex + 1);
8181
}
8282

8383
/**
@@ -94,7 +94,7 @@ public static String extractPropertyLastElement(String propertyKay) {
9494
* String[] headers = {"header1", "val1", "header2", "val2"};
9595
* </pre>
9696
*/
97-
public static String[] flatMapToHeaderArray(Map<String, String> headerMap) {
97+
public static String[] toHeaderAndValueArray(Map<String, String> headerMap) {
9898
return headerMap
9999
.entrySet()
100100
.stream()
@@ -121,23 +121,11 @@ private static <T> void tryAddToConfigMap(
121121
public static Properties getHttpConnectorProperties(Map<String, String> tableOptions) {
122122
final Properties httpProperties = new Properties();
123123

124-
if (hasHttpConnectorProperties(tableOptions)) {
125-
tableOptions.keySet().stream()
126-
.filter(key -> key.startsWith(HttpConnectorConfigConstants.GID_CONNECTOR_HTTP))
127-
.forEach(
128-
key -> {
129-
final String value = tableOptions.get(key);
130-
httpProperties.put(key, value);
131-
});
132-
}
133-
return httpProperties;
134-
}
124+
tableOptions.entrySet().stream()
125+
.filter(entry ->
126+
entry.getKey().startsWith(HttpConnectorConfigConstants.GID_CONNECTOR_HTTP))
127+
.forEach(entry -> httpProperties.put(entry.getKey(), entry.getValue()));
135128

136-
private static boolean hasHttpConnectorProperties(Map<String, String> tableOptions) {
137-
return tableOptions
138-
.keySet()
139-
.stream()
140-
.anyMatch(k -> k.startsWith(HttpConnectorConfigConstants.GID_CONNECTOR_HTTP));
129+
return httpProperties;
141130
}
142-
143131
}

src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ public void testConnection() throws Exception {
8080
(s, _context) ->
8181
new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8)))
8282
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
83-
.setProperty(HttpConnectorConfigConstants.CONTENT_TYPE_HEADER, contentTypeHeader)
83+
.setProperty(
84+
HttpConnectorConfigConstants.SINK_HEADER_PREFIX + "Content-Type",
85+
contentTypeHeader)
8486
.build();
8587
source.sinkTo(httpSink);
8688
env.execute("Http Sink test connection");

src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkInsertTest.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public void tearDown() {
4141
@Test
4242
public void testHttpDynamicSinkDefaultPost() throws Exception {
4343
wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")).willReturn(ok()));
44+
String contentTypeHeaderValue = "application/json";
4445

4546
final String createTable =
4647
String.format(
@@ -55,10 +56,12 @@ public void testHttpDynamicSinkDefaultPost() throws Exception {
5556
+ ") with (\n"
5657
+ " 'connector' = '%s',\n"
5758
+ " 'url' = '%s',\n"
58-
+ " 'format' = 'json'\n"
59+
+ " 'format' = 'json',\n"
60+
+ " 'gid.connector.http.sink.header.Content-Type' = '%s'\n"
5961
+ ")",
6062
HttpDynamicTableSinkFactory.IDENTIFIER,
61-
"http://localhost:" + SERVER_PORT + "/myendpoint"
63+
"http://localhost:" + SERVER_PORT + "/myendpoint",
64+
contentTypeHeaderValue
6265
);
6366

6467
tEnv.executeSql(createTable);
@@ -80,12 +83,13 @@ public void testHttpDynamicSinkDefaultPost() throws Exception {
8083
request.getBodyAsString()
8184
);
8285
assertEquals(RequestMethod.POST, request.getMethod());
83-
assertEquals("application/json", request.getHeader("Content-Type"));
86+
assertEquals(contentTypeHeaderValue, request.getHeader("Content-Type"));
8487
}
8588

8689
@Test
8790
public void testHttpDynamicSinkPut() throws Exception {
8891
wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")).willReturn(ok()));
92+
String contentTypeHeaderValue = "application/json";
8993

9094
final String createTable =
9195
String.format(
@@ -101,10 +105,12 @@ public void testHttpDynamicSinkPut() throws Exception {
101105
+ " 'connector' = '%s',\n"
102106
+ " 'url' = '%s',\n"
103107
+ " 'insert-method' = 'PUT',\n"
104-
+ " 'format' = 'json'\n"
108+
+ " 'format' = 'json',\n"
109+
+ " 'gid.connector.http.sink.header.Content-Type' = '%s'\n"
105110
+ ")",
106111
HttpDynamicTableSinkFactory.IDENTIFIER,
107-
"http://localhost:" + SERVER_PORT + "/myendpoint"
112+
"http://localhost:" + SERVER_PORT + "/myendpoint",
113+
contentTypeHeaderValue
108114
);
109115

110116
tEnv.executeSql(createTable);
@@ -127,7 +133,7 @@ public void testHttpDynamicSinkPut() throws Exception {
127133
));
128134
for (var request : postedRequests) {
129135
assertEquals(RequestMethod.PUT, request.getMethod());
130-
assertEquals("application/json", request.getHeader("Content-Type"));
136+
assertEquals(contentTypeHeaderValue, request.getHeader("Content-Type"));
131137
assertTrue(jsonRequests.contains(request.getBodyAsString()));
132138
jsonRequests.remove(request.getBodyAsString());
133139
}
@@ -136,6 +142,7 @@ public void testHttpDynamicSinkPut() throws Exception {
136142
@Test
137143
public void testHttpDynamicSinkRawFormat() throws Exception {
138144
wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")).willReturn(ok()));
145+
String contentTypeHeaderValue = "application/octet-stream";
139146

140147
final String createTable =
141148
String.format(
@@ -144,10 +151,12 @@ public void testHttpDynamicSinkRawFormat() throws Exception {
144151
+ ") with (\n"
145152
+ " 'connector' = '%s',\n"
146153
+ " 'url' = '%s',\n"
147-
+ " 'format' = 'raw'\n"
154+
+ " 'format' = 'raw',\n"
155+
+ " 'gid.connector.http.sink.header.Content-Type' = '%s'\n"
148156
+ ")",
149157
HttpDynamicTableSinkFactory.IDENTIFIER,
150-
"http://localhost:" + SERVER_PORT + "/myendpoint"
158+
"http://localhost:" + SERVER_PORT + "/myendpoint",
159+
contentTypeHeaderValue
151160
);
152161

153162
tEnv.executeSql(createTable);
@@ -161,7 +170,7 @@ public void testHttpDynamicSinkRawFormat() throws Exception {
161170
var request = postedRequests.get(0);
162171
assertEquals("Clee", request.getBodyAsString());
163172
assertEquals(RequestMethod.POST, request.getMethod());
164-
assertEquals("application/octet-stream", request.getHeader("Content-Type"));
173+
assertEquals(contentTypeHeaderValue, request.getHeader("Content-Type"));
165174
}
166175

167176
@Test

src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkTest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.getindata.connectors.http.internal.table.sink;
22

3-
import java.util.Map;
4-
53
import org.apache.flink.configuration.Configuration;
64
import org.apache.flink.table.connector.ChangelogMode;
75
import org.apache.flink.table.factories.FactoryUtil;
@@ -28,7 +26,6 @@ public void testAsSummaryString() {
2826
.setConsumedDataType(
2927
new AtomicDataType(new BooleanType(false)))
3028
.setEncodingFormat(mockFormat)
31-
.setFormatContentTypeMap(Map.of())
3229
.build();
3330

3431
assertThat(dynamicSink.asSummaryString()).isEqualTo("HttpSink");
@@ -51,7 +48,6 @@ public void copyEqualityTest() {
5148
.setConsumedDataType(
5249
new AtomicDataType(new BooleanType(false)))
5350
.setEncodingFormat(mockFormat)
54-
.setFormatContentTypeMap(Map.of())
5551
.build();
5652

5753
assertEquals(sink, sink.copy());
@@ -74,7 +70,6 @@ private HttpDynamicSink.HttpDynamicTableSinkBuilder getSinkBuilder() {
7470
)
7571
.setConsumedDataType(consumedDataType)
7672
.setEncodingFormat(mockFormat)
77-
.setFormatContentTypeMap(Map.of())
7873
.setMaxBatchSize(1);
7974
}
8075

src/test/java/com/getindata/connectors/http/internal/utils/ConfigUtilsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void flatMapPropertyMap() {
9898
"my.super.propertyThree", "val3"
9999
);
100100

101-
String[] propertyArray = ConfigUtils.flatMapToHeaderArray(propertyMap);
101+
String[] propertyArray = ConfigUtils.toHeaderAndValueArray(propertyMap);
102102

103103
// size is == propertyMap.key size + propertyMap.value.size
104104
assertThat(propertyArray).hasSize(6);

0 commit comments

Comments
 (0)