Skip to content

Commit ba68043

Browse files
authored
ESP-221_BasicAuth - adding HeaderPreprocessor and support for Basic Authentication mechanism. (#26)
* ESP-221_BasicAuth - adding HeaderPreprocessor.java Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> * ESP-221_BasicAuth - Changes after code review. Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
1 parent fc7fca7 commit ba68043

26 files changed

+479
-117
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@
44

55
### Added
66

7+
- Add Http Header value preprocessor mechanism, that can preprocess defined header value before setting it on the request.
8+
- Allow user to specify `Authorization` header for Basic Authentication. The value will be converted to Base64,
9+
or if it starts from prefix `Basic `, it will be used as is (without any extra modification).
710
- Add TLS and mTLS support for Http Sink and Lookup Source connectors.
811
New properties are:
912
- `gid.connector.http.security.cert.server` - path to server's certificate.
1013
- `gid.connector.http.security.cert.client` - path to connector's certificate.
1114
- `gid.connector.http.security.key.client` - path to connector's private key.
12-
- `gid.connector.http.security.cert.server.allowSelfSigned` - allowing for self signed certificates without adding them to KeyStore (not recommended for a production).
15+
- `gid.connector.http.security.cert.server.allowSelfSigned` - allowing for self-signed certificates without adding them to KeyStore (not recommended for a production).
1316

1417
## [0.4.0] - 2022-08-31
1518

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,11 @@ For non production environments it is sometimes necessary to use Https connectio
231231
In this special case, you can configure connector to trust all certificates without adding them to keystore.
232232
To enable this option use `gid.connector.http.security.cert.server.allowSelfSigned` property setting its value to `true`.
233233

234+
## Basic Authentication
235+
Connector supports Basic Authentication mechanism using HTTP `Authorization` header.
236+
The header value can set via properties same as other headers. Connector will convert passed value to Base64 and use it for request.
237+
If the used value starts from prefix `Basic `, it will be used as header value as is, without any extra modification.
238+
234239
## Table API Connector Options
235240
### HTTP TableLookup Source
236241
| Option | Required | Description/Value |

src/main/java/com/getindata/connectors/http/HttpSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.apache.flink.annotation.PublicEvolving;
66
import org.apache.flink.connector.base.sink.writer.ElementConverter;
77

8+
import com.getindata.connectors.http.internal.HeaderPreprocessor;
89
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
910
import com.getindata.connectors.http.internal.sink.HttpSinkInternal;
1011
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
@@ -41,8 +42,10 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
4142
long maxRecordSizeInBytes,
4243
String endpointUrl,
4344
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
45+
HeaderPreprocessor headerPreprocessor,
4446
SinkHttpClientBuilder sinkHttpClientBuilder,
4547
Properties properties) {
48+
4649
super(elementConverter,
4750
maxBatchSize,
4851
maxInFlightRequests,
@@ -52,6 +55,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
5255
maxRecordSizeInBytes,
5356
endpointUrl,
5457
httpPostRequestCallback,
58+
headerPreprocessor,
5559
sinkHttpClientBuilder,
5660
properties
5761
);

src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
88
import org.apache.flink.connector.base.sink.writer.ElementConverter;
99

10+
import com.getindata.connectors.http.internal.HeaderPreprocessor;
1011
import com.getindata.connectors.http.internal.SinkHttpClient;
1112
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
1213
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
1314
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
1415
import com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallback;
16+
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
1517

1618
/**
1719
* Builder to construct {@link HttpSink}.
@@ -63,6 +65,9 @@ public class HttpSinkBuilder<InputT> extends
6365
private static final HttpPostRequestCallback<HttpSinkRequestEntry>
6466
DEFAULT_POST_REQUEST_CALLBACK = new Slf4jHttpPostRequestCallback();
6567

68+
private static final HeaderPreprocessor DEFAULT_HEADER_PREPROCESSOR =
69+
HttpHeaderUtils.createDefaultHeaderPreprocessor();
70+
6671
private final Properties properties = new Properties();
6772

6873
// Mandatory field
@@ -77,9 +82,13 @@ public class HttpSinkBuilder<InputT> extends
7782
// If not defined, should be set to DEFAULT_POST_REQUEST_CALLBACK
7883
private HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
7984

85+
// If not defined, should be set to DEFAULT_HEADER_PREPROCESSOR
86+
private HeaderPreprocessor headerPreprocessor;
87+
8088
HttpSinkBuilder() {
8189
this.sinkHttpClientBuilder = DEFAULT_CLIENT_BUILDER;
8290
this.httpPostRequestCallback = DEFAULT_POST_REQUEST_CALLBACK;
91+
this.headerPreprocessor = DEFAULT_HEADER_PREPROCESSOR;
8392
}
8493

8594
/**
@@ -119,6 +128,12 @@ public HttpSinkBuilder<InputT> setHttpPostRequestCallback(
119128
return this;
120129
}
121130

131+
public HttpSinkBuilder<InputT> setHttpHeaderPreprocessor(
132+
HeaderPreprocessor headerPreprocessor) {
133+
this.headerPreprocessor = headerPreprocessor;
134+
return this;
135+
}
136+
122137
/**
123138
* Set property for Http Sink.
124139
* @param propertyName property name
@@ -152,6 +167,7 @@ public HttpSink<InputT> build() {
152167
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
153168
endpointUrl,
154169
httpPostRequestCallback,
170+
headerPreprocessor,
155171
sinkHttpClientBuilder,
156172
properties
157173
);
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.getindata.connectors.http.internal;
2+
3+
import java.util.Base64;
4+
5+
/**
6+
* Header processor for HTTP Basic Authentication mechanism.
7+
* Only "Basic" authentication is supported currently.
8+
*/
9+
public class BasicAuthHeaderValuePreprocessor implements HeaderValuePreprocessor {
10+
11+
public static final String BASIC = "Basic ";
12+
13+
/**
14+
* Calculates {@link Base64} value of provided header value. For Basic authentication mechanism,
15+
* the raw value is expected to match user:password pattern.
16+
* <p>
17+
* If rawValue starts with "Basic " prefix it is assumed that this value is already converted to
18+
* expected "Authorization" header value.
19+
*
20+
* @param rawValue header original value to modify.
21+
* @return value of "Authorization" header with format "Basic " + Base64 from rawValue or
22+
* rawValue without any changes if it starts with "Basic " prefix.
23+
*/
24+
@Override
25+
public String preprocessHeaderValue(String rawValue) {
26+
if (rawValue.startsWith(BASIC)) {
27+
return rawValue;
28+
} else {
29+
return BASIC + Base64.getEncoder().encodeToString(rawValue.getBytes());
30+
}
31+
}
32+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.getindata.connectors.http.internal;
2+
3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
/**
8+
* This implementation of {@link HeaderPreprocessor} acts as a registry for all {@link
9+
* HeaderValuePreprocessor} that should be applied on HTTP request.
10+
*/
11+
public class ComposeHeaderPreprocessor implements HeaderPreprocessor {
12+
13+
/**
14+
* Default, pass through header value preprocessor used whenever dedicated preprocessor for a
15+
* given header does not exist.
16+
*/
17+
private static final HeaderValuePreprocessor DEFAULT_VALUE_PREPROCESSOR = rawValue -> rawValue;
18+
19+
/**
20+
* Map with {@link HeaderValuePreprocessor} to apply.
21+
*/
22+
private final Map<String, HeaderValuePreprocessor> valuePreprocessors;
23+
24+
/**
25+
* Creates a new instance of ComposeHeaderPreprocessor for provided {@link
26+
* HeaderValuePreprocessor} map.
27+
*
28+
* @param valuePreprocessors map of {@link HeaderValuePreprocessor} that should be used for this
29+
* processor. If null, then default, pass through header value
30+
* processor will be used for every header.
31+
*/
32+
public ComposeHeaderPreprocessor(Map<String, HeaderValuePreprocessor> valuePreprocessors) {
33+
this.valuePreprocessors = (valuePreprocessors == null)
34+
? Collections.emptyMap()
35+
: new HashMap<>(valuePreprocessors);
36+
}
37+
38+
@Override
39+
public String preprocessValueForHeader(String headerName, String headerRawValue) {
40+
return valuePreprocessors
41+
.getOrDefault(headerName, DEFAULT_VALUE_PREPROCESSOR)
42+
.preprocessHeaderValue(headerRawValue);
43+
}
44+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.getindata.connectors.http.internal;
2+
3+
import java.io.Serializable;
4+
5+
/**
6+
* Interface for header preprocessing
7+
*/
8+
public interface HeaderPreprocessor extends Serializable {
9+
10+
/**
11+
* Preprocess value of a header.Preprocessing can change or validate header value.
12+
* @param headerName header name which value should be preprocessed.
13+
* @param headerRawValue header value to process.
14+
* @return preprocessed header value.
15+
*/
16+
String preprocessValueForHeader(String headerName, String headerRawValue);
17+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.getindata.connectors.http.internal;
2+
3+
import java.io.Serializable;
4+
5+
/**
6+
* Processor interface which modifies header value based on implemented logic.
7+
* An example would be calculation of Value of Authorization header.
8+
*/
9+
public interface HeaderValuePreprocessor extends Serializable {
10+
11+
/**
12+
* Modifies header rawValue according to the implemented logic.
13+
* @param rawValue header original value to modify
14+
* @return modified header value.
15+
*/
16+
String preprocessHeaderValue(String rawValue);
17+
18+
}

src/main/java/com/getindata/connectors/http/internal/SinkHttpClientBuilder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@
1313
*/
1414
@PublicEvolving
1515
public interface SinkHttpClientBuilder extends Serializable {
16+
17+
// TODO Consider moving HttpPostRequestCallback and HeaderPreprocessor to be a
18+
// SinkHttpClientBuilder fields. This method is getting more and more arguments.
1619
SinkHttpClient build(
1720
Properties properties,
18-
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback
21+
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
22+
HeaderPreprocessor headerPreprocessor
1923
);
2024
}

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
import org.apache.flink.util.StringUtils;
1414

1515
import com.getindata.connectors.http.HttpPostRequestCallback;
16+
import com.getindata.connectors.http.internal.HeaderPreprocessor;
1617
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
1718

18-
1919
/**
2020
* An internal implementation of HTTP Sink that performs async requests against a specified HTTP
2121
* endpoint using the buffering protocol specified in {@link AsyncSinkBase}.
@@ -60,6 +60,8 @@ public class HttpSinkInternal<InputT> extends AsyncSinkBase<InputT, HttpSinkRequ
6060

6161
private final HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
6262

63+
private final HeaderPreprocessor headerPreprocessor;
64+
6365
private final Properties properties;
6466

6567
protected HttpSinkInternal(
@@ -72,13 +74,20 @@ protected HttpSinkInternal(
7274
long maxRecordSizeInBytes,
7375
String endpointUrl,
7476
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
77+
HeaderPreprocessor headerPreprocessor,
7578
SinkHttpClientBuilder sinkHttpClientBuilder,
76-
Properties properties
77-
) {
78-
super(elementConverter, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
79+
Properties properties) {
80+
81+
super(
82+
elementConverter,
83+
maxBatchSize,
84+
maxInFlightRequests,
85+
maxBufferedRequests,
7986
maxBatchSizeInBytes,
80-
maxTimeInBufferMS, maxRecordSizeInBytes
87+
maxTimeInBufferMS,
88+
maxRecordSizeInBytes
8189
);
90+
8291
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(endpointUrl),
8392
"The endpoint URL must be set when initializing HTTP Sink.");
8493
this.endpointUrl = endpointUrl;
@@ -87,6 +96,10 @@ protected HttpSinkInternal(
8796
httpPostRequestCallback,
8897
"Post request callback must be set when initializing HTTP Sink."
8998
);
99+
this.headerPreprocessor = Preconditions.checkNotNull(
100+
headerPreprocessor,
101+
"Header Preprocessor must be set when initializing HTTP Sink."
102+
);
90103
this.sinkHttpClientBuilder =
91104
Preconditions.checkNotNull(sinkHttpClientBuilder,
92105
"The HTTP client builder must not be null when initializing HTTP Sink.");
@@ -107,7 +120,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
107120
getMaxTimeInBufferMS(),
108121
getMaxRecordSizeInBytes(),
109122
endpointUrl,
110-
sinkHttpClientBuilder.build(properties, httpPostRequestCallback),
123+
sinkHttpClientBuilder.build(properties, httpPostRequestCallback, headerPreprocessor),
111124
Collections.emptyList()
112125
);
113126
}
@@ -126,7 +139,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
126139
getMaxTimeInBufferMS(),
127140
getMaxRecordSizeInBytes(),
128141
endpointUrl,
129-
sinkHttpClientBuilder.build(properties, httpPostRequestCallback),
142+
sinkHttpClientBuilder.build(properties, httpPostRequestCallback, headerPreprocessor),
130143
recoveredState
131144
);
132145
}

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.flink.annotation.VisibleForTesting;
1616

1717
import com.getindata.connectors.http.HttpPostRequestCallback;
18+
import com.getindata.connectors.http.internal.HeaderPreprocessor;
1819
import com.getindata.connectors.http.internal.SinkHttpClient;
1920
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
2021
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
@@ -23,9 +24,8 @@
2324
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
2425
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
2526
import com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallback;
26-
import com.getindata.connectors.http.internal.utils.ConfigUtils;
27+
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
2728
import com.getindata.connectors.http.internal.utils.JavaNetHttpClientFactory;
28-
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SINK_HEADER_PREFIX;
2929

3030
/**
3131
* An implementation of {@link SinkHttpClient} that uses Java 11's {@link HttpClient}.
@@ -36,32 +36,31 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
3636

3737
private final HttpClient httpClient;
3838

39-
private final Map<String, String> headerMap;
40-
4139
private final String[] headersAndValues;
4240

41+
private final Map<String, String> headerMap;
42+
4343
private final HttpStatusCodeChecker statusCodeChecker;
4444

4545
private final HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
4646

47-
public JavaNetSinkHttpClient(Properties properties) {
48-
this(properties, new Slf4jHttpPostRequestCallback());
47+
public JavaNetSinkHttpClient(Properties properties, HeaderPreprocessor headerPreprocessor) {
48+
this(properties, new Slf4jHttpPostRequestCallback(), headerPreprocessor);
4949
}
5050

5151
public JavaNetSinkHttpClient(
5252
Properties properties,
53-
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback) {
53+
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
54+
HeaderPreprocessor headerPreprocessor) {
5455

5556
this.httpClient = JavaNetHttpClientFactory.createClient(properties);
5657
this.httpPostRequestCallback = httpPostRequestCallback;
57-
58-
var propertiesHeaderMap =
59-
ConfigUtils.propertiesToMap(properties, SINK_HEADER_PREFIX, String.class);
60-
this.headersAndValues = ConfigUtils.toHeaderAndValueArray(propertiesHeaderMap);
61-
this.headerMap = new HashMap<>();
62-
propertiesHeaderMap
63-
.forEach((key, value) ->
64-
this.headerMap.put(ConfigUtils.extractPropertyLastElement(key), value));
58+
this.headerMap = HttpHeaderUtils.prepareHeaderMap(
59+
HttpConnectorConfigConstants.SINK_HEADER_PREFIX,
60+
properties,
61+
headerPreprocessor
62+
);
63+
this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(this.headerMap);
6564

6665
// TODO Inject this via constructor when implementing a response processor.
6766
// Processor will be injected and it will wrap statusChecker implementation.

0 commit comments

Comments
 (0)