Skip to content

Commit d75e18d

Browse files
OlivierZembrikristoffSC
authored andcommitted
HTTP-92 Customization of HTTP lookup source logger
Customization of HTTP lookup source logger Signed-off-by: Olivier Zembri <<ozembri@fr.ibm.com>>
1 parent a1e485e commit d75e18d

16 files changed

+240
-16
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
## [Unreleased]
44

5+
### Added
6+
7+
- Added support for optionally using a custom SLF4J logger to trace HTTP lookup queries.
8+
New configuration parameter: `gid.connector.http.source.lookup.request-callback` with default value
9+
`slf4j-lookup-logger`. If this parameter is not provided then the default SLF4J logger
10+
[Slf4JHttpLookupPostRequestCallback](https://github.com/getindata/flink-http-connector/blob/main/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java)
11+
is used instead.
12+
513
## [0.13.0] - 2024-04-03
614

715
### Added

README.md

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -338,18 +338,43 @@ CREATE TABLE http (
338338
```
339339

340340
#### Custom request/response callback
341-
Http Sink processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the
341+
342+
- Http Sink processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the
342343
behaviour of the additional stage of processing done by Table API Sink by implementing
343344
[HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java) and
344345
[HttpPostRequestCallbackFactory](src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java)
345-
interfaces. Custom implementations of `HttpSinkRequestCallbackFactory` can be registered along other factories in
346-
`resources/META-INF.services/org.apache.flink.table.factories.Factory` file and then referenced by their identifiers in
346+
interfaces. Custom implementations of `HttpPostRequestCallbackFactory<HttpRequest>` can be registered along other factories in
347+
`resources/META-INF/services/org.apache.flink.table.factories.Factory` file and then referenced by their identifiers in
347348
the HttpSink DDL property field `gid.connector.http.sink.request-callback`.
348349

349-
A default implementation that logs those pairs as *INFO* level logs using Slf4j
350+
For example, one can create a class `CustomHttpSinkPostRequestCallbackFactory` with a unique identifier, say `rest-sink-logger`,
351+
that implements interface `HttpPostRequestCallbackFactory<HttpRequest>` to create a new instance of a custom callback
352+
`CustomHttpSinkPostRequestCallback`. This factory can be registered along other factories by appending the fully-qualified name
353+
of class `CustomHttpSinkPostRequestCallbackFactory` in `resources/META-INF/services/org.apache.flink.table.factories.Factory` file
354+
and then reference identifier `rest-sink-logger` in the HttpSink DDL property field `gid.connector.http.sink.request-callback`.
355+
356+
A default implementation that logs those pairs as *INFO* level logs using Slf4j
350357
([Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java))
351358
is provided.
352359

360+
361+
- Http Lookup Source processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the
362+
behaviour of the additional stage of processing done by Table Function API by implementing
363+
[HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java) and
364+
[HttpPostRequestCallbackFactory](src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java)
365+
interfaces.
366+
367+
For example, one can create a class `CustomHttpLookupPostRequestCallbackFactory` with a unique identifier, say `rest-lookup-logger`,
368+
that implements interface `HttpPostRequestCallbackFactory<HttpLookupSourceRequestEntry>` to create a new instance of a custom callback
369+
`CustomHttpLookupPostRequestCallback`. This factory can be registered along other factories by appending the fully-qualified name
370+
of class `CustomHttpLookupPostRequestCallbackFactory` in `resources/META-INF/services/org.apache.flink.table.factories.Factory` file
371+
and then reference identifier `rest-lookup-logger` in the HTTP lookup DDL property field `gid.connector.http.source.lookup.request-callback`.
372+
373+
A default implementation that logs those pairs as *INFO* level logs using Slf4j
374+
([Slf4JHttpLookupPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java))
375+
is provided.
376+
377+
353378
## HTTP status code handler
354379
Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors.
355380
By default all 400s and 500s response codes will be interpreted as error code.
@@ -409,6 +434,7 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
409434
| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
410435
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
411436
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |
437+
| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. |
412438

413439
### HTTP Sink
414440
| Option | Required | Description/Value |

src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,27 @@
22

33
import org.apache.flink.table.factories.Factory;
44

5+
import com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSource;
56
import com.getindata.connectors.http.internal.table.sink.HttpDynamicSink;
67

78
/**
89
* The {@link Factory} that dynamically creates and injects {@link HttpPostRequestCallback} to
9-
* {@link HttpDynamicSink}.
10+
* {@link HttpDynamicSink} and {@link HttpLookupTableSource}.
1011
*
1112
* <p>Custom implementations of {@link HttpPostRequestCallbackFactory} can be registered along
1213
* other factories in
13-
* <pre>resources/META-INF.services/org.apache.flink.table.factories.Factory</pre>
14-
* file and then referenced by their identifiers in the HttpSink DDL property field
15-
* <i>gid.connector.http.sink.request-callback</i>.
14+
* <pre>resources/META-INF/services/org.apache.flink.table.factories.Factory</pre>
15+
* file and then referenced by their identifiers in:
16+
* <li>
17+
* The HttpSink DDL property field <i>gid.connector.http.sink.request-callback</i>
18+
* for HTTP sink.
19+
* </li>
20+
* <li>
21+
* The Http lookup DDL property field <i>gid.connector.http.source.lookup.request-callback</i>
22+
* for HTTP lookup.
23+
* </li>
24+
*
25+
* <br />
1626
*
1727
* <p>The following example shows the minimum Table API example to create a {@link HttpDynamicSink}
1828
* that uses a custom callback created by a factory that returns <i>my-callback</i> as its
@@ -30,8 +40,24 @@
3040
* )
3141
* }</pre>
3242
*
43+
* <p>The following example shows the minimum Table API example to create a
44+
* {@link HttpLookupTableSource} that uses a custom callback created by a factory that
45+
* returns <i>my-callback</i> as its identifier.
46+
*
47+
* <pre>{@code
48+
* CREATE TABLE httplookup (
49+
* id bigint
50+
* ) with (
51+
* 'connector' = 'rest-lookup',
52+
* 'url' = 'http://example.com/myendpoint',
53+
* 'format' = 'json',
54+
* 'gid.connector.http.source.lookup.request-callback' = 'my-callback'
55+
* )
56+
* }</pre>
57+
*
3358
* @param <RequestT> type of the HTTP request wrapper
3459
*/
60+
3561
public interface HttpPostRequestCallbackFactory<RequestT> extends Factory {
3662
/**
3763
* @return {@link HttpPostRequestCallback} custom request callback instance

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public final class HttpConnectorConfigConstants {
4848
GID_CONNECTOR_HTTP + "source.lookup.error.code";
4949
// -----------------------------------------------------
5050

51+
public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
52+
GID_CONNECTOR_HTTP + "source.lookup.request-callback";
53+
5154
public static final String SINK_REQUEST_CALLBACK_IDENTIFIER =
5255
GID_CONNECTOR_HTTP + "sink.request-callback";
5356

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import org.apache.flink.configuration.Configuration;
1010
import org.apache.flink.configuration.ReadableConfig;
1111

12+
import com.getindata.connectors.http.HttpPostRequestCallback;
13+
1214
@Builder
1315
@Data
1416
@RequiredArgsConstructor
@@ -26,4 +28,6 @@ public class HttpLookupConfig implements Serializable {
2628

2729
@Builder.Default
2830
private final ReadableConfig readableConfig = new Configuration();
31+
32+
private final HttpPostRequestCallback<HttpLookupSourceRequestEntry> httpPostRequestCallback;
2933
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW;
77
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER;
8+
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER;
89

910
public class HttpLookupConnectorOptions {
1011

@@ -47,4 +48,9 @@ public class HttpLookupConnectorOptions {
4748
.booleanType()
4849
.defaultValue(false)
4950
.withDescription("Whether to use the raw value of Authorization header");
51+
52+
public static final ConfigOption<String> REQUEST_CALLBACK_IDENTIFIER =
53+
ConfigOptions.key(SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER)
54+
.stringType()
55+
.defaultValue(Slf4jHttpLookupPostRequestCallbackFactory.IDENTIFIER);
5056
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.apache.flink.table.api.DataTypes.FIELD;
2424
import static org.apache.flink.table.types.utils.DataTypeUtils.removeTimeAttribute;
2525

26+
import com.getindata.connectors.http.HttpPostRequestCallbackFactory;
2627
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
2728
import com.getindata.connectors.http.internal.utils.ConfigUtils;
2829
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;
@@ -88,20 +89,29 @@ public Set<ConfigOption<?>> requiredOptions() {
8889

8990
@Override
9091
public Set<ConfigOption<?>> optionalOptions() {
91-
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD);
92+
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD, REQUEST_CALLBACK_IDENTIFIER);
9293
}
9394

9495
private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) {
9596

9697
Properties httpConnectorProperties =
9798
ConfigUtils.getHttpConnectorProperties(context.getCatalogTable().getOptions());
9899

100+
final HttpPostRequestCallbackFactory<HttpLookupSourceRequestEntry>
101+
postRequestCallbackFactory =
102+
FactoryUtil.discoverFactory(
103+
context.getClassLoader(),
104+
HttpPostRequestCallbackFactory.class,
105+
readableConfig.get(REQUEST_CALLBACK_IDENTIFIER)
106+
);
107+
99108
return HttpLookupConfig.builder()
100109
.lookupMethod(readableConfig.get(LOOKUP_METHOD))
101110
.url(readableConfig.get(URL))
102111
.useAsync(readableConfig.get(ASYNC_POLLING))
103112
.properties(httpConnectorProperties)
104113
.readableConfig(readableConfig)
114+
.httpPostRequestCallback(postRequestCallbackFactory.createHttpPostRequestCallback())
105115
.build();
106116
}
107117

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ public JavaNetHttpPollingClient(
4747
this.responseBodyDecoder = responseBodyDecoder;
4848
this.requestFactory = requestFactory;
4949

50-
// TODO inject same way as it is done for Sink
51-
this.httpPostRequestCallback = new Slf4JHttpLookupPostRequestCallback();
50+
this.httpPostRequestCallback = options.getHttpPostRequestCallback();
5251

5352
// TODO Inject this via constructor when implementing a response processor.
5453
// Processor will be injected and it will wrap statusChecker implementation.
@@ -92,22 +91,21 @@ private Optional<RowData> processHttpResponse(
9291
this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());
9392

9493
if (response == null) {
95-
log.warn("Null Http response for request " + request.getHttpRequest().uri().toString());
9694
return Optional.empty();
9795
}
9896

9997
String responseBody = response.body();
10098
int statusCode = response.statusCode();
10199

102-
log.debug("Received {} status code for RestTableSource Request", statusCode);
100+
log.debug("Received status code [%s] for RestTableSource request " +
101+
"with Server response body [%s] ", statusCode, responseBody);
102+
103103
if (notErrorCodeAndNotEmptyBody(responseBody, statusCode)) {
104-
log.trace("Server response body" + responseBody);
105104
return Optional.ofNullable(responseBodyDecoder.deserialize(responseBody.getBytes()));
106105
} else {
107106
log.warn(
108107
String.format("Returned Http status code was invalid or returned body was empty. "
109-
+ "Status Code [%s], "
110-
+ "response body [%s]", statusCode, responseBody)
108+
+ "Status Code [%s]", statusCode)
111109
);
112110

113111
return Optional.empty();

src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public void call(
4343
}
4444

4545
if (response == null) {
46+
log.warn("Null Http response for request " + httpRequest.uri().toString());
47+
4648
log.info(
4749
"Got response for a request.\n Request:\n URL: {}\n " +
4850
"Method: {}\n Headers: {}\n Params/Body: {}\nResponse: null",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.getindata.connectors.http.internal.table.lookup;
2+
3+
import java.util.HashSet;
4+
import java.util.Set;
5+
6+
import org.apache.flink.configuration.ConfigOption;
7+
8+
import com.getindata.connectors.http.HttpPostRequestCallback;
9+
import com.getindata.connectors.http.HttpPostRequestCallbackFactory;
10+
11+
/**
12+
* Factory for creating {@link Slf4JHttpLookupPostRequestCallback}.
13+
*/
14+
public class Slf4jHttpLookupPostRequestCallbackFactory
15+
implements HttpPostRequestCallbackFactory<HttpLookupSourceRequestEntry> {
16+
17+
public static final String IDENTIFIER = "slf4j-lookup-logger";
18+
19+
@Override
20+
public HttpPostRequestCallback<HttpLookupSourceRequestEntry> createHttpPostRequestCallback() {
21+
return new Slf4JHttpLookupPostRequestCallback();
22+
}
23+
24+
@Override
25+
public String factoryIdentifier() {
26+
return IDENTIFIER;
27+
}
28+
29+
@Override
30+
public Set<ConfigOption<?>> requiredOptions() {
31+
return new HashSet<>();
32+
}
33+
34+
@Override
35+
public Set<ConfigOption<?>> optionalOptions() {
36+
return new HashSet<>();
37+
}
38+
}

src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory
22
com.getindata.connectors.http.internal.table.lookup.querycreators.ElasticSearchLiteQueryCreatorFactory
33
com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreatorFactory
44
com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreatorFactory
5+
com.getindata.connectors.http.internal.table.lookup.Slf4jHttpLookupPostRequestCallbackFactory
56
com.getindata.connectors.http.internal.table.sink.HttpDynamicTableSinkFactory
67
com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallbackFactory

0 commit comments

Comments
 (0)