Skip to content

Commit d200f1d

Browse files
authored
HTTP-99 Add support for generic json and URL query creator (#149)
Signed-off-by: davidradl <david_radley@uk.ibm.com>
1 parent cc4967f commit d200f1d

20 files changed

+1036
-51
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## [Unreleased]
44

5+
- Added support for generic json and URL query creator
6+
57
- Retries support for source table:
68
- Auto retry on IOException and user-defined http codes - parameter `gid.connector.http.source.lookup.retry-codes`.
79
- Parameters `gid.connector.http.source.lookup.error.code.exclude"` and `gid.connector.http.source.lookup.error.code` were replaced by `gid.connector.http.source.lookup.ignored-response-codes`.
@@ -13,6 +15,8 @@
1315

1416
## [0.18.0] - 2025-01-15
1517

18+
### Fixed
19+
1620
- Ignore Eclipse files in .gitignore
1721
- Support Flink 1.20
1822

README.md

Lines changed: 45 additions & 36 deletions
Large diffs are not rendered by default.

pom.xml

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,26 @@ under the License.
146146
<scope>provided</scope>
147147
</dependency>
148148

149+
<dependency>
150+
<groupId>org.apache.flink</groupId>
151+
<artifactId>flink-format-common</artifactId>
152+
<version>${flink.version}</version>
153+
<scope>provided</scope>
154+
</dependency>
155+
156+
<dependency>
157+
<groupId>org.apache.flink</groupId>
158+
<artifactId>flink-json</artifactId>
159+
<version>${flink.version}</version>
160+
<scope>provided</scope>
161+
</dependency>
162+
163+
<dependency>
164+
<groupId>com.fasterxml.jackson.core</groupId>
165+
<artifactId>jackson-core</artifactId>
166+
<version>${jackson.version}</version>
167+
</dependency>
168+
149169
<dependency>
150170
<groupId>com.fasterxml.jackson.core</groupId>
151171
<artifactId>jackson-databind</artifactId>
@@ -224,15 +244,6 @@ under the License.
224244
<scope>test</scope>
225245
</dependency>
226246

227-
<!-- Serializers for tests -->
228-
<dependency>
229-
<groupId>org.apache.flink</groupId>
230-
<artifactId>flink-json</artifactId>
231-
<version>${flink.version}</version>
232-
<scope>test</scope>
233-
</dependency>
234-
<!-- ***** -->
235-
236247
<dependency>
237248
<groupId>org.apache.flink</groupId>
238249
<artifactId>flink-runtime-web</artifactId>

src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
public interface LookupQueryCreatorFactory extends Factory, Serializable {
3838

3939
/**
40+
* @param readableConfig readable config
41+
* @param lookupRow lookup row
42+
* @param dynamicTableFactoryContext context
4043
* @return {@link LookupQueryCreator} custom lookup query creator instance
4144
*/
4245
LookupQueryCreator createLookupQueryCreator(

src/main/java/com/getindata/connectors/http/internal/table/lookup/RowDataSingleValueLookupSchemaEntry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public List<LookupArg> convertToLookupArg(RowData lookupKeyRow) {
4747
}
4848

4949
if (!(value instanceof BinaryStringData)) {
50-
log.debug("Unsupported Key Type {}. Trying simple toString(), wish me luck...",
50+
log.debug("Unsupported Key Type {}. Trying simple toString().",
5151
value.getClass());
5252
}
5353

src/main/java/com/getindata/connectors/http/internal/table/lookup/TableSourceHelper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public final class TableSourceHelper {
2222
* <p>Note: This method returns an empty list for every {@link DataType} that is not a
2323
* composite
2424
* type.
25+
* @param type logical type
26+
* @return List of field names
2527
*/
2628
public static List<String> getFieldNames(LogicalType type) {
2729

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
* © Copyright IBM Corp. 2025
3+
*/
4+
5+
package com.getindata.connectors.http.internal.table.lookup.querycreators;
6+
7+
import java.io.IOException;
8+
import java.io.UnsupportedEncodingException;
9+
import java.net.URLEncoder;
10+
import java.nio.charset.StandardCharsets;
11+
import java.util.*;
12+
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.apache.flink.annotation.VisibleForTesting;
15+
import org.apache.flink.api.common.serialization.SerializationSchema;
16+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
17+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
18+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
19+
import org.apache.flink.table.api.DataTypes.Field;
20+
import org.apache.flink.table.data.GenericRowData;
21+
import org.apache.flink.table.data.RowData;
22+
import org.apache.flink.table.types.DataType;
23+
import org.apache.flink.table.types.FieldsDataType;
24+
import org.apache.flink.types.Row;
25+
import org.apache.flink.util.FlinkRuntimeException;
26+
import org.apache.flink.util.Preconditions;
27+
28+
import com.getindata.connectors.http.LookupArg;
29+
import com.getindata.connectors.http.LookupQueryCreator;
30+
import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo;
31+
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
32+
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
33+
34+
/**
35+
* Generic JSON and URL query creator; in addition to be able to map columns to json requests,
36+
* it allows url inserts to be mapped to column names using templating.
37+
* <br>
38+
* For GETs, column names are mapped to query parameters. e.g. for
39+
* <code>GenericJsonAndUrlQueryCreator.REQUEST_PARAM_FIELDS</code> = "id1;id2"
40+
* and url of http://base. At lookup time with values of id1=1 and id2=2 a call of
41+
* http/base?id1=1&amp;id2=2 will be issued.
42+
* <br>
43+
* For PUT and POST, parameters are mapped to the json body e.g. for
44+
* REQUEST_PARAM_FIELDS = "id1;id2" and url of http://base. At lookup time with values of id1=1 and
45+
* id2=2 as call of http/base will be issued with a json payload of {"id1":1,"id2":2}
46+
* <br>
47+
* For all http methods, url segments can be used to include lookup up values. Using the map from
48+
* <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key of the insert and the
49+
* value of the associated column.
50+
* e.g. for <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> = "key1":"col1"
51+
* and url of http://base/{key1}. At lookup time with values of col1="aaaa" a call of
52+
* http/base/aaaa will be issued.
53+
*
54+
*/
55+
@Slf4j
56+
public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
57+
private static final long serialVersionUID = 1L;
58+
59+
// not final so we can mutate for unit test
60+
private SerializationSchema<RowData> serializationSchema;
61+
private boolean schemaOpened = false;
62+
private LookupRow lookupRow;
63+
private final String httpMethod;
64+
private final List<String> requestQueryParamsFields;
65+
private final List<String> requestBodyFields;
66+
private final Map<String, String> requestUrlMap;
67+
68+
/**
69+
* Construct a Generic JSON and URL query creator.
70+
*
71+
* @param httpMethod the requested http method
72+
* @param serializationSchema serialization schema for RowData
73+
* @param requestQueryParamsFields query param fields
74+
* @param requestBodyFields body fields used for PUT and POSTs
75+
* @param requestUrlMap url map
76+
* @param lookupRow lookup row itself.
77+
*/
78+
public GenericJsonAndUrlQueryCreator(final String httpMethod,
79+
final SerializationSchema<RowData>
80+
serializationSchema,
81+
final List<String> requestQueryParamsFields,
82+
final List<String> requestBodyFields,
83+
final Map<String, String> requestUrlMap,
84+
final LookupRow lookupRow) {
85+
this.httpMethod = httpMethod;
86+
this.serializationSchema = serializationSchema;
87+
this.lookupRow = lookupRow;
88+
this.requestQueryParamsFields = requestQueryParamsFields;
89+
this.requestBodyFields = requestBodyFields;
90+
this.requestUrlMap = requestUrlMap;
91+
}
92+
@VisibleForTesting
93+
void setSerializationSchema(SerializationSchema<RowData>
94+
serializationSchema) {
95+
this.serializationSchema = serializationSchema;
96+
}
97+
98+
@Override
99+
public LookupQueryInfo createLookupQuery(final RowData lookupDataRow) {
100+
this.checkOpened();
101+
102+
final String lookupQuery;
103+
Map<String, String> bodyBasedUrlQueryParams = new HashMap<>();
104+
final Collection<LookupArg> lookupArgs =
105+
lookupRow.convertToLookupArgs(lookupDataRow);
106+
ObjectNode jsonObject;
107+
try {
108+
jsonObject = (ObjectNode) ObjectMapperAdapter.instance().readTree(
109+
serializationSchema.serialize(lookupDataRow));
110+
} catch (IOException e) {
111+
String message = "Unable to parse the lookup arguments to json.";
112+
log.error(message, e);
113+
throw new RuntimeException(message, e);
114+
}
115+
// Parameters are encoded as query params for GET and none GET.
116+
// Later code will turn these query params into the body for PUTs and POSTs
117+
ObjectNode jsonObjectForQueryParams = ObjectMapperAdapter.instance().createObjectNode();
118+
for (String requestColumnName : this.requestQueryParamsFields) {
119+
jsonObjectForQueryParams.set(requestColumnName, jsonObject.get(requestColumnName));
120+
}
121+
// TODO can we convertToQueryParameters for all ops
122+
// and not use/deprecate bodyBasedUrlQueryParams
123+
if (httpMethod.equalsIgnoreCase("GET")) {
124+
// add the query parameters
125+
lookupQuery = convertToQueryParameters(jsonObjectForQueryParams,
126+
StandardCharsets.UTF_8.toString());
127+
} else {
128+
// Body-based queries
129+
// serialize to a string for the body.
130+
try {
131+
lookupQuery = ObjectMapperAdapter.instance()
132+
.writeValueAsString(jsonObject.retain(requestBodyFields));
133+
} catch (JsonProcessingException e) {
134+
final String message = "Unable to convert Json Object to a string";
135+
throw new RuntimeException(message,e);
136+
}
137+
// body parameters
138+
// use the request json object to scope the required fields and the lookupArgs as values
139+
bodyBasedUrlQueryParams = createBodyBasedParams(lookupArgs,
140+
jsonObjectForQueryParams);
141+
}
142+
// add the path map
143+
final Map<String, String> pathBasedUrlParams = createPathBasedParams(lookupArgs,
144+
requestUrlMap);
145+
146+
return new LookupQueryInfo(lookupQuery, bodyBasedUrlQueryParams, pathBasedUrlParams);
147+
148+
}
149+
150+
/**
151+
* Create a Row from a RowData and DataType
152+
* @param lookupRowData the lookup RowData
153+
* @param rowType the datatype
154+
* @return row return row
155+
*/
156+
@VisibleForTesting
157+
static Row rowDataToRow(final RowData lookupRowData, final DataType rowType) {
158+
Preconditions.checkNotNull(lookupRowData);
159+
Preconditions.checkNotNull(rowType);
160+
161+
final Row row = Row.withNames();
162+
final List<Field> rowFields = FieldsDataType.getFields(rowType);
163+
164+
for (int idx = 0; idx < rowFields.size(); idx++) {
165+
final String fieldName = rowFields.get(idx).getName();
166+
final Object fieldValue = ((GenericRowData) lookupRowData).getField(idx);
167+
row.setField(fieldName, fieldValue);
168+
}
169+
return row;
170+
}
171+
172+
/**
173+
* Create map of the json key to the lookup argument
174+
* value. This is used for body based content.
175+
* @param args lookup arguments
176+
* @param objectNode object node
177+
* @return map of field content to the lookup argument value.
178+
*/
179+
private Map<String, String> createBodyBasedParams(final Collection<LookupArg> args,
180+
ObjectNode objectNode ) {
181+
Map<String, String> mapOfJsonKeyToLookupArg = new LinkedHashMap<>();
182+
Iterator<Map.Entry<String, JsonNode>> iterator = objectNode.fields();
183+
iterator.forEachRemaining(field -> {
184+
for (final LookupArg arg : args) {
185+
if (arg.getArgName().equals(field.getKey())) {
186+
String keyForMap = field.getKey();
187+
mapOfJsonKeyToLookupArg.put(
188+
keyForMap, arg.getArgValue());
189+
}
190+
}
191+
});
192+
193+
return mapOfJsonKeyToLookupArg;
194+
}
195+
/**
196+
* Create map of the json key to the lookup argument
197+
* value. This is used for path based content.
198+
* @param args lookup arguments
199+
* @param urlMap map of insert name to column name
200+
* @return map of field content to the lookup argument value.
201+
*/
202+
private Map<String, String> createPathBasedParams(final Collection<LookupArg> args,
203+
Map<String, String> urlMap ) {
204+
Map<String, String> mapOfJsonKeyToLookupArg = new LinkedHashMap<>();
205+
if (urlMap != null) {
206+
for (String key: urlMap.keySet()) {
207+
for (final LookupArg arg : args) {
208+
if (arg.getArgName().equals(key)) {
209+
mapOfJsonKeyToLookupArg.put(
210+
urlMap.get(key), arg.getArgValue());
211+
}
212+
}
213+
}
214+
}
215+
return mapOfJsonKeyToLookupArg;
216+
}
217+
/**
218+
* Convert json object to query params string
219+
* @param jsonObject supplies json object
220+
* @param enc encoding string - used in unit test to drive unsupported encoding
221+
* @return query params string
222+
*/
223+
@VisibleForTesting
224+
static String convertToQueryParameters(final ObjectNode jsonObject, String enc) {
225+
Preconditions.checkNotNull(jsonObject);
226+
227+
final StringJoiner result = new StringJoiner("&");
228+
jsonObject.fields().forEachRemaining(field -> {
229+
final String fieldName = field.getKey();
230+
final String fieldValue = field.getValue().asText();
231+
232+
try {
233+
result.add(fieldName + "="
234+
+ URLEncoder.encode(fieldValue, enc));
235+
} catch (UnsupportedEncodingException e) {
236+
final String message =
237+
"Failed to encode the value of the query parameter name "
238+
+ fieldName
239+
+ ": "
240+
+ fieldValue;
241+
throw new RuntimeException(message, e);
242+
}
243+
});
244+
245+
return result.toString();
246+
}
247+
248+
private void checkOpened() {
249+
if (!this.schemaOpened) {
250+
try {
251+
this.serializationSchema.open(
252+
SerializationSchemaUtils
253+
.createSerializationInitContext(
254+
GenericJsonAndUrlQueryCreator.class));
255+
this.schemaOpened = true;
256+
} catch (final Exception e) {
257+
final String message =
258+
"Failed to initialize serialization schema for "
259+
+ GenericJsonAndUrlQueryCreator.class;
260+
log.error(message, e);
261+
throw new FlinkRuntimeException(message, e);
262+
}
263+
}
264+
}
265+
}

0 commit comments

Comments
 (0)