Skip to content

Commit e98effc

Browse files
author
Krzysztof Chmielewski
committed
ESP-98ESP-98_SinkParameters - add properties to Table API.
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
1 parent 42471ac commit e98effc

File tree

2 files changed

+48
-1
lines changed

2 files changed

+48
-1
lines changed

src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.flink.table.connector.sink.DynamicTableSink;
1212
import org.apache.flink.table.factories.FactoryUtil;
1313

14+
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
1415
import com.getindata.connectors.http.internal.utils.ConfigUtils;
1516
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.INSERT_METHOD;
1617
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.URL;
@@ -33,7 +34,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
3334
ReadableConfig tableOptions = factoryContext.getTableOptions();
3435

3536
// Validate configuration
36-
FactoryUtil.createTableFactoryHelper(this, context).validate();
37+
FactoryUtil.createTableFactoryHelper(this, context)
38+
.validateExcept(HttpConnectorConfigConstants.GID_CONNECTOR_HTTP);
3739
validateHttpSinkOptions(tableOptions);
3840
Properties asyncSinkProperties =
3941
new AsyncSinkConfigurationValidator(tableOptions).getValidatedConfigurations();

src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkInsertTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.HashSet;
44
import java.util.Set;
5+
import java.util.concurrent.ExecutionException;
56

67
import com.github.tomakehurst.wiremock.WireMockServer;
78
import com.github.tomakehurst.wiremock.http.RequestMethod;
@@ -162,4 +163,48 @@ public void testHttpDynamicSinkRawFormat() throws Exception {
162163
assertEquals(RequestMethod.POST, request.getMethod());
163164
assertEquals("application/octet-stream", request.getHeader("Content-Type"));
164165
}
166+
167+
@Test
168+
public void testHttpRequestWithHeadersFromDdl()
169+
throws ExecutionException, InterruptedException {
170+
String originHeaderValue = "*";
171+
String xContentTypeOptionsHeaderValue = "nosniff";
172+
String contentTypeHeaderValue = "application/json";
173+
174+
wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")).willReturn(ok()));
175+
176+
final String createTable =
177+
String.format(
178+
"CREATE TABLE http (\n"
179+
+ " last_name string"
180+
+ ") with (\n"
181+
+ " 'connector' = '%s',\n"
182+
+ " 'url' = '%s',\n"
183+
+ " 'format' = 'raw',\n"
184+
+ " 'gid.connector.http.sink.header.Origin' = '%s',\n"
185+
+ " 'gid.connector.http.sink.header.X-Content-Type-Options' = '%s',\n"
186+
+ " 'gid.connector.http.sink.header.Content-Type' = '%s'\n"
187+
+ ")",
188+
HttpDynamicTableSinkFactory.IDENTIFIER,
189+
"http://localhost:" + SERVER_PORT + "/myendpoint",
190+
originHeaderValue,
191+
xContentTypeOptionsHeaderValue,
192+
contentTypeHeaderValue
193+
);
194+
195+
tEnv.executeSql(createTable);
196+
197+
final String insert = "INSERT INTO http VALUES ('Clee')";
198+
tEnv.executeSql(insert).await();
199+
200+
var postedRequests = wireMockServer.findAll(anyRequestedFor(urlPathEqualTo("/myendpoint")));
201+
assertEquals(1, postedRequests.size());
202+
203+
var request = postedRequests.get(0);
204+
assertEquals("Clee", request.getBodyAsString());
205+
assertEquals(RequestMethod.POST, request.getMethod());
206+
assertEquals(contentTypeHeaderValue, request.getHeader("Content-Type"));
207+
assertEquals(originHeaderValue, request.getHeader("Origin"));
208+
assertEquals(xContentTypeOptionsHeaderValue, request.getHeader("X-Content-Type-Options"));
209+
}
165210
}

0 commit comments

Comments
 (0)