Skip to content

Commit a7ddf6b

Browse files
kristoffSCKrzysztof Chmielewski
andauthored
Drop Dependency for Apache HttpClient (#17)
* Drop_apacheHttpClient_Dependency - drop dependency Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> * Drop_apacheHttpClient_Dependency - Licence update for httpcomponents-client code. Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> * Drop_apacheHttpClient_Dependency - Update README.md Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> * Drop_apacheHttpClient_Dependency - Changes after code review. Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
1 parent b2c6371 commit a7ddf6b

20 files changed

+1718
-37
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99
- Add new properties `gid.connector.http.sink.error.code` and `gid.connector.http.sink.error.code.exclude`
1010
to set HTTP status code that should be interpreted as errors.
1111

12+
### Changed
13+
- Change dependency scope for `org.apache.flink.flink-connector-base` from `compile` to `provided`.
14+
15+
### Removed
16+
- Remove dependency on `org.apache.httpcomponents.httpclient`from production code. Dependency is only for test scope.
17+
1218
## [0.3.0] - 2022-07-21
1319

1420
- Package refactoring. Hide internal classes that does not have to be used by API users under "internal" package.

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ Currently, HTTP TableLookup connector supports only Lookup Joins [1] and expects
1818
* Maven 3
1919
* Flink 1.15+
2020

21+
## Runtime dependencies
22+
This connector has few Flink's runtime dependencies, that are expected to be provided.
23+
* `org.apache.flink.flink-java`
24+
* `org.apache.flink.flink-clients`
25+
* `org.apache.flink.flink-connector-base`
26+
* `org.apache.flink.flink-java`
27+
2128
## Installation
2229

2330
In order to use the `flink-http-connector` the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. For build automation tool reference, look into Maven Central: [https://mvnrepository.com/artifact/com.getindata/flink-http-connector](https://mvnrepository.com/artifact/com.getindata/flink-http-connector).

pom.xml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ under the License.
148148
<groupId>org.apache.flink</groupId>
149149
<artifactId>flink-connector-base</artifactId>
150150
<version>${flink.version}</version>
151+
<scope>provided</scope>
151152
</dependency>
152153

153154
<!--===== Needed Dep ====-->
@@ -157,12 +158,6 @@ under the License.
157158
<version>${json.path.version}</version>
158159
</dependency>
159160

160-
<dependency>
161-
<groupId>org.apache.httpcomponents</groupId>
162-
<artifactId>httpclient</artifactId>
163-
<version>4.5.13</version>
164-
</dependency>
165-
166161
<dependency>
167162
<groupId>org.projectlombok</groupId>
168163
<artifactId>lombok</artifactId>
@@ -171,6 +166,13 @@ under the License.
171166
</dependency>
172167

173168
<!--TEST-->
169+
<dependency>
170+
<groupId>org.apache.httpcomponents</groupId>
171+
<artifactId>httpclient</artifactId>
172+
<version>4.5.13</version>
173+
<scope>test</scope>
174+
</dependency>
175+
174176
<dependency>
175177
<groupId>org.apache.flink</groupId>
176178
<artifactId>flink-table-common</artifactId>

src/main/java/com/getindata/connectors/http/internal/JsonResultTableConverter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.getindata.connectors.http.internal;
22

3-
43
import java.util.ArrayList;
54
import java.util.HashMap;
65
import java.util.List;

src/main/java/com/getindata/connectors/http/internal/config/ConfigException.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ public ConfigException(String message, Throwable t) {
1616
super(message, t);
1717
}
1818

19+
/**
20+
* Creates an exception object using predefined exception message template:
21+
* {@code Invalid value + (value) + for configuration + (property name) + (additional message) }
22+
* @param name configuration property name.
23+
* @param value configuration property value.
24+
* @param message custom message appended to the end of exception message.
25+
*/
1926
public ConfigException(String name, Object value, String message) {
2027
super("Invalid value " + value + " for configuration " + name + (message == null ? ""
2128
: ": " + message));

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,35 +27,25 @@
2727
public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequestEntry> {
2828

2929
private final String endpointUrl;
30+
3031
private final SinkHttpClient sinkHttpClient;
31-
private final Counter numRecordsSendErrorsCounter;
3232

33-
public HttpSinkWriter(
34-
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter, Sink.InitContext context,
35-
int maxBatchSize,
36-
int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes,
37-
long maxTimeInBufferMS,
38-
long maxRecordSizeInBytes, String endpointUrl, SinkHttpClient sinkHttpClient
39-
) {
40-
this(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
41-
maxBatchSizeInBytes,
42-
maxTimeInBufferMS, maxRecordSizeInBytes, endpointUrl, sinkHttpClient,
43-
Collections.emptyList()
44-
);
45-
}
33+
private final Counter numRecordsSendErrorsCounter;
4634

4735
public HttpSinkWriter(
48-
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter, Sink.InitContext context,
49-
int maxBatchSize,
50-
int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes,
51-
long maxTimeInBufferMS,
52-
long maxRecordSizeInBytes, String endpointUrl, SinkHttpClient sinkHttpClient,
53-
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates
54-
) {
36+
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
37+
Sink.InitContext context,
38+
int maxBatchSize,
39+
int maxInFlightRequests,
40+
int maxBufferedRequests,
41+
long maxBatchSizeInBytes,
42+
long maxTimeInBufferMS,
43+
long maxRecordSizeInBytes,
44+
String endpointUrl,
45+
SinkHttpClient sinkHttpClient,
46+
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates) {
5547
super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
56-
maxBatchSizeInBytes,
57-
maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates
58-
);
48+
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
5949
this.endpointUrl = endpointUrl;
6050
this.sinkHttpClient = sinkHttpClient;
6151

@@ -67,13 +57,13 @@ public HttpSinkWriter(
6757
@Override
6858
protected void submitRequestEntries(
6959
List<HttpSinkRequestEntry> requestEntries,
70-
Consumer<List<HttpSinkRequestEntry>> requestResult
71-
) {
60+
Consumer<List<HttpSinkRequestEntry>> requestResult) {
7261
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
7362
future.whenComplete((response, err) -> {
7463
if (err != null) {
75-
var failedRequestsNumber = requestEntries.size();
76-
log.error("Http Sink fatally failed to write all {} requests",
64+
int failedRequestsNumber = requestEntries.size();
65+
log.error(
66+
"Http Sink fatally failed to write all {} requests",
7767
failedRequestsNumber);
7868
numRecordsSendErrorsCounter.inc(failedRequestsNumber);
7969

@@ -83,7 +73,7 @@ protected void submitRequestEntries(
8373
// a clear image how we want to do it, so it would be both efficient and correct.
8474
//requestResult.accept(requestEntries);
8575
} else if (response.getFailedRequests().size() > 0) {
86-
var failedRequestsNumber = response.getFailedRequests().size();
76+
int failedRequestsNumber = response.getFailedRequests().size();
8777
log.error("Http Sink failed to write and will retry {} requests",
8878
failedRequestsNumber);
8979
numRecordsSendErrorsCounter.inc(failedRequestsNumber);

src/main/java/com/getindata/connectors/http/internal/table/lookup/RestTablePollingClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
import lombok.RequiredArgsConstructor;
1313
import lombok.extern.slf4j.Slf4j;
1414
import org.apache.flink.table.data.RowData;
15-
import org.apache.http.client.utils.URIBuilder;
1615

1716
import com.getindata.connectors.http.internal.JsonResultTableConverter;
1817
import com.getindata.connectors.http.internal.PollingClient;
18+
import com.getindata.connectors.http.internal.utils.uri.URIBuilder;
1919

2020
@Slf4j
2121
@RequiredArgsConstructor
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
* ============================= NOTE =================================
27+
* This code has been copied from
28+
* https://github.com/apache/httpcomponents-client/tree/rel/v4.5.13
29+
* and it was changed to use in this project.
30+
* ====================================================================
31+
*/
32+
33+
package com.getindata.connectors.http.internal.utils.uri;
34+
35+
import java.io.Serializable;
36+
import java.nio.CharBuffer;
37+
38+
import org.apache.flink.util.Preconditions;
39+
40+
/**
41+
* A resizable char array.
42+
*/
43+
final class CharArrayBuffer implements CharSequence, Serializable {
44+
45+
private static final long serialVersionUID = -6208952725094867135L;
46+
47+
private char[] buffer;
48+
49+
private int len;
50+
51+
/**
52+
* Creates an instance of {@link CharArrayBuffer} with the given initial capacity.
53+
*
54+
* @param capacity the capacity
55+
*/
56+
CharArrayBuffer(final int capacity) {
57+
super();
58+
Preconditions.checkArgument(capacity > 0, "Buffer capacity must be bigger than 0.");
59+
this.buffer = new char[capacity];
60+
}
61+
62+
/**
63+
* Appends chars of the given string to this buffer. The capacity of the buffer is increased, if
64+
* necessary, to accommodate all chars.
65+
*
66+
* @param str the string.
67+
*/
68+
void append(final String str) {
69+
final String s = str != null ? str : "null";
70+
final int strLen = s.length();
71+
final int newLen = this.len + strLen;
72+
if (newLen > this.buffer.length) {
73+
expand(newLen);
74+
}
75+
s.getChars(0, strLen, this.buffer, this.len);
76+
this.len = newLen;
77+
}
78+
79+
/**
80+
* Returns the {@code char} value in this buffer at the specified index. The index argument must
81+
* be greater than or equal to {@code 0}, and less than the length of this buffer.
82+
*
83+
* @param i the index of the desired char value.
84+
* @return the char value at the specified index.
85+
* @throws IndexOutOfBoundsException if {@code index} is negative or greater than or equal to
86+
* {@link #length()}.
87+
*/
88+
@Override
89+
public char charAt(final int i) {
90+
return this.buffer[i];
91+
}
92+
93+
/**
94+
* Returns the length of the buffer (char count).
95+
*
96+
* @return the length of the buffer
97+
*/
98+
@Override
99+
public int length() {
100+
return this.len;
101+
}
102+
103+
@Override
104+
public CharSequence subSequence(final int beginIndex, final int endIndex) {
105+
if (beginIndex < 0) {
106+
throw new IndexOutOfBoundsException("Negative beginIndex: " + beginIndex);
107+
}
108+
if (endIndex > this.len) {
109+
throw new IndexOutOfBoundsException("endIndex: " + endIndex + " > length: " + this.len);
110+
}
111+
if (beginIndex > endIndex) {
112+
throw new IndexOutOfBoundsException(
113+
"beginIndex: " + beginIndex + " > endIndex: " + endIndex);
114+
}
115+
return CharBuffer.wrap(this.buffer, beginIndex, endIndex);
116+
}
117+
118+
private void expand(final int newLen) {
119+
final char[] newBuffer = new char[Math.max(this.buffer.length << 1, newLen)];
120+
System.arraycopy(this.buffer, 0, newBuffer, 0, this.len);
121+
this.buffer = newBuffer;
122+
}
123+
124+
@Override
125+
public String toString() {
126+
return new String(this.buffer, 0, this.len);
127+
}
128+
129+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
* ============================= NOTE =================================
27+
* This code has been copied from
28+
* https://github.com/apache/httpcomponents-client/tree/rel/v4.5.13
29+
* and it was changed to use in this project.
30+
* ====================================================================
31+
*/
32+
33+
package com.getindata.connectors.http.internal.utils.uri;
34+
35+
import lombok.Data;
36+
import org.apache.flink.util.Preconditions;
37+
38+
@Data
39+
class NameValuePair {
40+
41+
private final String name;
42+
43+
private final String value;
44+
45+
/**
46+
* Default Constructor taking a name and a value. The value may be null.
47+
*
48+
* @param name The name.
49+
* @param value The value.
50+
*/
51+
NameValuePair(final String name, final String value) {
52+
super();
53+
this.name = Preconditions.checkNotNull(name, "Name may not be null");
54+
this.value = value;
55+
}
56+
}

0 commit comments

Comments
 (0)