Skip to content

Commit 42471ac

Browse files
kristoffSCKrzysztof Chmielewski
authored andcommitted
ESP-98ESP-98_SinkParameters - add properties to Table API.
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
1 parent 36ae5b6 commit 42471ac

File tree

5 files changed

+96
-25
lines changed

5 files changed

+96
-25
lines changed

src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,20 @@ public HttpSinkBuilder<InputT> setElementConverter(
104104
return this;
105105
}
106106

107+
/**
108+
* Set property for Http Sink.
109+
* @param propertyName property name.
110+
* @param propertyValue property value.
111+
*/
107112
public HttpSinkBuilder<InputT> setProperty(String propertyName, String propertyValue) {
108113
this.properties.setProperty(propertyName, propertyValue);
109114
return this;
110115
}
111116

117+
/**
118+
* Add properties to Http Sink configuration
119+
* @param properties Properties to add.
120+
*/
112121
public HttpSinkBuilder<InputT> setProperties(Properties properties) {
113122
this.properties.putAll(properties);
114123
return this;

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public JavaNetSinkHttpClient(Properties properties) {
4343
Map<String, String> headerMap =
4444
ConfigUtils.propertiesToMap(properties, SINK_HEADER_PREFIX, String.class);
4545

46-
// TODO ESP-98 add tests
4746
headersAndValues = ConfigUtils.flatMapToHeaderArray(headerMap);
4847
}
4948

src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.HashMap;
44
import java.util.Map;
5+
import java.util.Properties;
56
import javax.annotation.Nullable;
67

78
import lombok.EqualsAndHashCode;
@@ -75,10 +76,15 @@
7576
public class HttpDynamicSink extends AsyncDynamicTableSink<HttpSinkRequestEntry> {
7677

7778
private final DataType consumedDataType;
79+
7880
private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
81+
7982
private final ReadableConfig tableOptions;
83+
8084
private final Map<String, String> formatContentTypeMap;
8185

86+
private final Properties properties;
87+
8288
protected HttpDynamicSink(
8389
@Nullable Integer maxBatchSize,
8490
@Nullable Integer maxInFlightRequests,
@@ -88,7 +94,8 @@ protected HttpDynamicSink(
8894
DataType consumedDataType,
8995
EncodingFormat<SerializationSchema<RowData>> encodingFormat,
9096
Map<String, String> formatContentTypeMap,
91-
ReadableConfig tableOptions
97+
ReadableConfig tableOptions,
98+
Properties properties
9299
) {
93100
super(maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBufferSizeInBytes,
94101
maxTimeInBufferMS);
@@ -100,6 +107,7 @@ protected HttpDynamicSink(
100107
"Format to content type map must not be null");
101108
this.tableOptions =
102109
Preconditions.checkNotNull(tableOptions, "Table options must not be null");
110+
this.properties = properties;
103111
}
104112

105113
@Override
@@ -124,7 +132,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
124132
insertMethod,
125133
serializationSchema.serialize(rowData)
126134
))
127-
.setProperty(CONTENT_TYPE_HEADER, contentType);
135+
.setProperty(CONTENT_TYPE_HEADER, contentType)
136+
.setProperties(properties);
128137
addAsyncOptionsToSinkBuilder(builder);
129138

130139
return SinkV2Provider.of(builder.build());
@@ -141,7 +150,8 @@ public DynamicTableSink copy() {
141150
consumedDataType,
142151
encodingFormat,
143152
new HashMap<>(formatContentTypeMap),
144-
tableOptions
153+
tableOptions,
154+
properties
145155
);
146156
}
147157

@@ -168,10 +178,15 @@ private String getContentTypeFromFormat(String format) {
168178
public static class HttpDynamicTableSinkBuilder
169179
extends AsyncDynamicTableSinkBuilder<HttpSinkRequestEntry, HttpDynamicTableSinkBuilder> {
170180

171-
private ReadableConfig tableOptions = null;
172-
private Map<String, String> formatContentTypeMap = null;
173-
private DataType consumedDataType = null;
174-
private EncodingFormat<SerializationSchema<RowData>> encodingFormat = null;
181+
private final Properties properties = new Properties();
182+
183+
private ReadableConfig tableOptions;
184+
185+
private Map<String, String> formatContentTypeMap;
186+
187+
private DataType consumedDataType;
188+
189+
private EncodingFormat<SerializationSchema<RowData>> encodingFormat;
175190

176191
/**
177192
* @param tableOptions the {@link ReadableConfig} consisting of options listed in table
@@ -208,6 +223,25 @@ public HttpDynamicTableSinkBuilder setConsumedDataType(DataType consumedDataType
208223
return this;
209224
}
210225

226+
/**
227+
* Set property for Http Sink.
228+
* @param propertyName property name.
229+
* @param propertyValue property value.
230+
*/
231+
public HttpDynamicTableSinkBuilder setProperty(String propertyName, String propertyValue) {
232+
this.properties.setProperty(propertyName, propertyValue);
233+
return this;
234+
}
235+
236+
/**
237+
* Add properties to Http Sink configuration
238+
* @param properties Properties to add.
239+
*/
240+
public HttpDynamicTableSinkBuilder setProperties(Properties properties) {
241+
this.properties.putAll(properties);
242+
return this;
243+
}
244+
211245
@Override
212246
public HttpDynamicSink build() {
213247
return new HttpDynamicSink(
@@ -219,7 +253,8 @@ public HttpDynamicSink build() {
219253
consumedDataType,
220254
encodingFormat,
221255
formatContentTypeMap,
222-
tableOptions
256+
tableOptions,
257+
properties
223258
);
224259
}
225260
}

src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.flink.table.connector.sink.DynamicTableSink;
1212
import org.apache.flink.table.factories.FactoryUtil;
1313

14+
import com.getindata.connectors.http.internal.utils.ConfigUtils;
1415
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.INSERT_METHOD;
1516
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.URL;
1617

@@ -26,19 +27,6 @@ public class HttpDynamicTableSinkFactory extends AsyncDynamicTableSinkFactory {
2627
Map.entry("json", "application/json")
2728
);
2829

29-
private static void validateHttpSinkOptions(ReadableConfig tableOptions)
30-
throws IllegalArgumentException {
31-
tableOptions.getOptional(INSERT_METHOD).ifPresent(insertMethod -> {
32-
if (!Set.of("POST", "PUT").contains(insertMethod)) {
33-
throw new IllegalArgumentException(
34-
String.format(
35-
"Invalid option '%s'. It is expected to be either 'POST' or 'PUT'.",
36-
INSERT_METHOD.key()
37-
));
38-
}
39-
});
40-
}
41-
4230
@Override
4331
public DynamicTableSink createDynamicTableSink(Context context) {
4432
final AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context);
@@ -47,16 +35,20 @@ public DynamicTableSink createDynamicTableSink(Context context) {
4735
// Validate configuration
4836
FactoryUtil.createTableFactoryHelper(this, context).validate();
4937
validateHttpSinkOptions(tableOptions);
50-
Properties properties =
38+
Properties asyncSinkProperties =
5139
new AsyncSinkConfigurationValidator(tableOptions).getValidatedConfigurations();
5240

41+
Properties httpConnectorProperties =
42+
ConfigUtils.getHttpConnectorProperties(context.getCatalogTable().getOptions());
43+
5344
HttpDynamicSink.HttpDynamicTableSinkBuilder builder =
5445
new HttpDynamicSink.HttpDynamicTableSinkBuilder()
5546
.setTableOptions(tableOptions)
5647
.setEncodingFormat(factoryContext.getEncodingFormat())
5748
.setFormatContentTypeMap(FORMAT_CONTENT_TYPE_MAP)
58-
.setConsumedDataType(factoryContext.getPhysicalDataType());
59-
addAsyncOptionsToBuilder(properties, builder);
49+
.setConsumedDataType(factoryContext.getPhysicalDataType())
50+
.setProperties(httpConnectorProperties);
51+
addAsyncOptionsToBuilder(asyncSinkProperties, builder);
6052

6153
return builder.build();
6254
}
@@ -77,4 +69,17 @@ public Set<ConfigOption<?>> optionalOptions() {
7769
options.add(INSERT_METHOD);
7870
return options;
7971
}
72+
73+
private void validateHttpSinkOptions(ReadableConfig tableOptions)
74+
throws IllegalArgumentException {
75+
tableOptions.getOptional(INSERT_METHOD).ifPresent(insertMethod -> {
76+
if (!Set.of("POST", "PUT").contains(insertMethod)) {
77+
throw new IllegalArgumentException(
78+
String.format(
79+
"Invalid option '%s'. It is expected to be either 'POST' or 'PUT'.",
80+
INSERT_METHOD.key()
81+
));
82+
}
83+
});
84+
}
8085
}

src/main/java/com/getindata/connectors/http/internal/utils/ConfigUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.flink.util.StringUtils;
1111

1212
import com.getindata.connectors.http.internal.config.ConfigException;
13+
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
1314

1415
@NoArgsConstructor(access = AccessLevel.NONE)
1516
public final class ConfigUtils {
@@ -117,4 +118,26 @@ private static <T> void tryAddToConfigMap(
117118
}
118119
}
119120

121+
public static Properties getHttpConnectorProperties(Map<String, String> tableOptions) {
122+
final Properties httpProperties = new Properties();
123+
124+
if (hasHttpConnectorProperties(tableOptions)) {
125+
tableOptions.keySet().stream()
126+
.filter(key -> key.startsWith(HttpConnectorConfigConstants.GID_CONNECTOR_HTTP))
127+
.forEach(
128+
key -> {
129+
final String value = tableOptions.get(key);
130+
httpProperties.put(key, value);
131+
});
132+
}
133+
return httpProperties;
134+
}
135+
136+
private static boolean hasHttpConnectorProperties(Map<String, String> tableOptions) {
137+
return tableOptions
138+
.keySet()
139+
.stream()
140+
.anyMatch(k -> k.startsWith(HttpConnectorConfigConstants.GID_CONNECTOR_HTTP));
141+
}
142+
120143
}

0 commit comments

Comments
 (0)