Skip to content

Commit 3c57b39

Browse files
HTTP-74 Add path parameter support (#84)
Signed-off-by: David Radley <david_radley@uk.ibm.com> Co-authored-by: David Radley <39792797+davidradl@users.noreply.github.com>
1 parent 93b0358 commit 3c57b39

File tree

3 files changed

+167
-2
lines changed

3 files changed

+167
-2
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## [Unreleased]
44

5+
### Added
6+
7+
- Added support for using the result of a lookup join operation in a subsequent select query that adds
8+
or removes columns (project pushdown operation).
9+
510
### Changed
611

712
- Changed [LookupQueryInfo](src/main/java/com/getindata/connectors/http/internal/table/lookup/LookupQueryInfo.java)

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.flink.configuration.ReadableConfig;
99
import org.apache.flink.table.api.DataTypes;
1010
import org.apache.flink.table.api.DataTypes.Field;
11+
import org.apache.flink.table.connector.Projection;
1112
import org.apache.flink.table.connector.format.DecodingFormat;
1213
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
1314
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -38,7 +39,7 @@
3839
public class HttpLookupTableSource
3940
implements LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown {
4041

41-
private final DataType physicalRowDataType;
42+
private DataType physicalRowDataType;
4243

4344
private final HttpLookupConfig lookupConfig;
4445

@@ -58,6 +59,11 @@ public HttpLookupTableSource(
5859
this.dynamicTableFactoryContext = dynamicTablecontext;
5960
}
6061

62+
@Override
63+
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
64+
physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType);
65+
}
66+
6167
@Override
6268
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
6369

@@ -127,7 +133,7 @@ public void applyLimit(long limit) {
127133

128134
@Override
129135
public boolean supportsNestedProjection() {
130-
return false;
136+
return true;
131137
}
132138

133139
private PollingClientFactory<RowData> createPollingClientFactory(

src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,160 @@ public void testHttpsMTlsLookupJoin() throws Exception {
243243
assertEnrichedRows(rows);
244244
}
245245

246+
@Test
247+
public void testLookupJoinProjectionPushDown() throws Exception {
248+
249+
// GIVEN
250+
setUpServerBodyStub(
251+
"POST",
252+
wireMockServer,
253+
List.of(
254+
matchingJsonPath("$.row.aStringColumn"),
255+
matchingJsonPath("$.row.anIntColumn"),
256+
matchingJsonPath("$.row.aFloatColumn")
257+
)
258+
);
259+
260+
String fields =
261+
"`row` ROW<`aStringColumn` STRING, `anIntColumn` INT, `aFloatColumn` FLOAT>\n";
262+
263+
String sourceTable =
264+
"CREATE TABLE Orders (\n"
265+
+ " proc_time AS PROCTIME(),\n"
266+
+ " id STRING,\n"
267+
+ fields
268+
+ ") WITH ("
269+
+ "'connector' = 'datagen',"
270+
+ "'rows-per-second' = '1',"
271+
+ "'fields.id.kind' = 'sequence',"
272+
+ "'fields.id.start' = '1',"
273+
+ "'fields.id.end' = '5'"
274+
+ ")";
275+
276+
String lookupTable =
277+
"CREATE TABLE Customers (\n" +
278+
" `enrichedInt` INT,\n" +
279+
" `enrichedString` STRING,\n" +
280+
" \n"
281+
+ fields
282+
+ ") WITH ("
283+
+ "'format' = 'json',"
284+
+ "'lookup-request.format' = 'json',"
285+
+ "'lookup-request.format.json.fail-on-missing-field' = 'true',"
286+
+ "'connector' = 'rest-lookup',"
287+
+ "'lookup-method' = 'POST',"
288+
+ "'url' = 'http://localhost:9090/client',"
289+
+ "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',"
290+
+ "'asyncPolling' = 'true'"
291+
+ ")";
292+
293+
tEnv.executeSql(sourceTable);
294+
tEnv.executeSql(lookupTable);
295+
296+
// WHEN
297+
// SQL query that performs JOIN on both tables.
298+
String joinQuery =
299+
"CREATE TEMPORARY VIEW lookupResult AS " +
300+
"SELECT o.id, o.`row`, c.enrichedInt, c.enrichedString FROM Orders AS o"
301+
+ " JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c"
302+
+ " ON (\n"
303+
+ " o.`row` = c.`row`\n"
304+
+ ")";
305+
306+
tEnv.executeSql(joinQuery);
307+
308+
// SQL query that performs a projection pushdown to limit the number of columns
309+
String lastQuery =
310+
"SELECT r.id, r.enrichedInt FROM lookupResult r;";
311+
312+
TableResult result = tEnv.executeSql(lastQuery);
313+
result.await(15, TimeUnit.SECONDS);
314+
315+
// THEN
316+
SortedSet<Row> collectedRows = getCollectedRows(result);
317+
318+
collectedRows.stream().forEach(row -> assertThat(row.getArity()).isEqualTo(2));
319+
320+
assertThat(collectedRows.size()).isEqualTo(5);
321+
}
322+
323+
@Test
324+
public void testLookupJoinProjectionPushDownNested() throws Exception {
325+
326+
// GIVEN
327+
setUpServerBodyStub(
328+
"POST",
329+
wireMockServer,
330+
List.of(
331+
matchingJsonPath("$.row.aStringColumn"),
332+
matchingJsonPath("$.row.anIntColumn"),
333+
matchingJsonPath("$.row.aFloatColumn")
334+
)
335+
);
336+
337+
String fields =
338+
"`row` ROW<`aStringColumn` STRING, `anIntColumn` INT, `aFloatColumn` FLOAT>\n";
339+
340+
String sourceTable =
341+
"CREATE TABLE Orders (\n"
342+
+ " proc_time AS PROCTIME(),\n"
343+
+ " id STRING,\n"
344+
+ fields
345+
+ ") WITH ("
346+
+ "'connector' = 'datagen',"
347+
+ "'rows-per-second' = '1',"
348+
+ "'fields.id.kind' = 'sequence',"
349+
+ "'fields.id.start' = '1',"
350+
+ "'fields.id.end' = '5'"
351+
+ ")";
352+
353+
String lookupTable =
354+
"CREATE TABLE Customers (\n" +
355+
" `enrichedInt` INT,\n" +
356+
" `enrichedString` STRING,\n" +
357+
" \n"
358+
+ fields
359+
+ ") WITH ("
360+
+ "'format' = 'json',"
361+
+ "'lookup-request.format' = 'json',"
362+
+ "'lookup-request.format.json.fail-on-missing-field' = 'true',"
363+
+ "'connector' = 'rest-lookup',"
364+
+ "'lookup-method' = 'POST',"
365+
+ "'url' = 'http://localhost:9090/client',"
366+
+ "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',"
367+
+ "'asyncPolling' = 'true'"
368+
+ ")";
369+
370+
tEnv.executeSql(sourceTable);
371+
tEnv.executeSql(lookupTable);
372+
373+
// WHEN
374+
// SQL query that performs JOIN on both tables.
375+
String joinQuery =
376+
"CREATE TEMPORARY VIEW lookupResult AS " +
377+
"SELECT o.id, o.`row`, c.enrichedInt, c.enrichedString FROM Orders AS o"
378+
+ " JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c"
379+
+ " ON (\n"
380+
+ " o.`row` = c.`row`\n"
381+
+ ")";
382+
383+
tEnv.executeSql(joinQuery);
384+
385+
// SQL query that performs a project pushdown to take a subset of columns with nested value
386+
String lastQuery =
387+
"SELECT r.id, r.enrichedInt, r.`row`.aStringColumn FROM lookupResult r;";
388+
389+
TableResult result = tEnv.executeSql(lastQuery);
390+
result.await(15, TimeUnit.SECONDS);
391+
392+
// THEN
393+
SortedSet<Row> collectedRows = getCollectedRows(result);
394+
395+
collectedRows.stream().forEach(row -> assertThat(row.getArity()).isEqualTo(3));
396+
397+
assertThat(collectedRows.size()).isEqualTo(5);
398+
}
399+
246400
@Test
247401
public void testLookupJoinOnRowType() throws Exception {
248402

0 commit comments

Comments
 (0)