Skip to content

Commit aee2f71

Browse files
Add RequestOptions to ingress requests. (#250)
1 parent 6bbf851 commit aee2f71

File tree

6 files changed

+176
-21
lines changed

6 files changed

+176
-21
lines changed

sdk-api-gen/src/main/resources/templates/Client.hbs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,18 @@ public class {{generatedClassSimpleName}} {
9797

9898
{{#handlers}}
9999
public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
100+
{{^outputEmpty}}return {{/outputEmpty}}this.{{name}}(
101+
{{^inputEmpty}}req, {{/inputEmpty}}
102+
dev.restate.sdk.client.RequestOptions.DEFAULT);
103+
}
104+
105+
public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{name}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
100106
{{^outputEmpty}}return {{/outputEmpty}}this.ingressClient.call(
101107
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
102108
{{inputSerdeFieldName}},
103109
{{outputSerdeFieldName}},
104-
{{#if inputEmpty}}null{{else}}req{{/if}});
110+
{{#if inputEmpty}}null{{else}}req{{/if}},
111+
requestOptions);
105112
}{{/handlers}}
106113

107114
public Send send() {
@@ -111,10 +118,17 @@ public class {{generatedClassSimpleName}} {
111118
public class Send {
112119
{{#handlers}}
113120
public String {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
121+
return this.{{name}}(
122+
{{^inputEmpty}}req, {{/inputEmpty}}
123+
dev.restate.sdk.client.RequestOptions.DEFAULT);
124+
}
125+
126+
public String {{name}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
114127
return IngressClient.this.ingressClient.send(
115128
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
116129
{{inputSerdeFieldName}},
117-
{{#if inputEmpty}}null{{else}}req{{/if}});
130+
{{#if inputEmpty}}null{{else}}req{{/if}},
131+
requestOptions);
118132
}{{/handlers}}
119133
}
120134
}

sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,13 @@ object {{generatedClassSimpleName}} {
7272
class IngressClient(private val ingressClient: dev.restate.sdk.client.IngressClient{{#isObject}}, private val key: String{{/isObject}}) {
7373

7474
{{#handlers}}
75-
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}{{/inputEmpty}}): {{{boxedOutputFqcn}}} {
75+
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): {{{boxedOutputFqcn}}} {
7676
return this.ingressClient.call(
7777
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
7878
{{inputSerdeFieldName}},
7979
{{outputSerdeFieldName}},
80-
{{#if inputEmpty}}null{{else}}req{{/if}});
80+
{{#if inputEmpty}}null{{else}}req{{/if}},
81+
requestOptions);
8182
}{{/handlers}}
8283

8384
fun send(): Send {
@@ -86,11 +87,12 @@ object {{generatedClassSimpleName}} {
8687

8788
inner class Send {
8889
{{#handlers}}
89-
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}{{/inputEmpty}}): String {
90+
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): String {
9091
return this@IngressClient.ingressClient.send(
9192
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this@IngressClient.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
9293
{{inputSerdeFieldName}},
93-
{{#if inputEmpty}}null{{else}}req{{/if}});
94+
{{#if inputEmpty}}null{{else}}req{{/if}},
95+
requestOptions);
9496
}{{/handlers}}
9597
}
9698
}

sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,30 @@
2020
import java.net.http.HttpRequest;
2121
import java.net.http.HttpResponse;
2222
import java.nio.charset.StandardCharsets;
23+
import java.util.Map;
2324

2425
public class DefaultIngressClient implements IngressClient {
2526

2627
private static final JsonFactory JSON_FACTORY = new JsonFactory();
2728

2829
private final HttpClient httpClient;
2930
private final URI baseUri;
31+
private final Map<String, String> headers;
3032

31-
public DefaultIngressClient(HttpClient httpClient, String baseUri) {
33+
public DefaultIngressClient(HttpClient httpClient, String baseUri, Map<String, String> headers) {
3234
this.httpClient = httpClient;
3335
this.baseUri = URI.create(baseUri);
36+
this.headers = headers;
3437
}
3538

3639
@Override
37-
public <Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req) {
38-
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req);
40+
public <Req, Res> Res call(
41+
Target target,
42+
Serde<Req> reqSerde,
43+
Serde<Res> resSerde,
44+
Req req,
45+
RequestOptions requestOptions) {
46+
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, requestOptions);
3947
HttpResponse<byte[]> response;
4048
try {
4149
response = httpClient.send(request, HttpResponse.BodyHandlers.ofByteArray());
@@ -54,8 +62,8 @@ public <Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSer
5462
}
5563

5664
@Override
57-
public <Req> String send(Target target, Serde<Req> reqSerde, Req req) {
58-
HttpRequest request = prepareHttpRequest(target, true, reqSerde, req);
65+
public <Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOptions options) {
66+
HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, options);
5967
HttpResponse<InputStream> response;
6068
try {
6169
response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
@@ -93,11 +101,30 @@ private URI toRequestURI(Target target, boolean isSend) {
93101
}
94102

95103
private <Req> HttpRequest prepareHttpRequest(
96-
Target target, boolean isSend, Serde<Req> reqSerde, Req req) {
104+
Target target, boolean isSend, Serde<Req> reqSerde, Req req, RequestOptions options) {
97105
var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend));
106+
107+
// Add content-type
98108
if (reqSerde.contentType() != null) {
99109
reqBuilder.header("content-type", reqSerde.contentType());
100110
}
111+
112+
// Add headers
113+
this.headers.forEach(reqBuilder::header);
114+
115+
// Add idempotency key and period
116+
if (options.getIdempotencyKey() != null) {
117+
reqBuilder.header("idempotency-key", options.getIdempotencyKey());
118+
}
119+
if (options.getIdempotencyRetainPeriod() != null) {
120+
reqBuilder.header(
121+
"idempotency-retention-period",
122+
String.valueOf(options.getIdempotencyRetainPeriod().toSeconds()));
123+
}
124+
125+
// Add additional headers
126+
options.getAdditionalHeaders().forEach(reqBuilder::header);
127+
101128
return reqBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(reqSerde.serialize(req))).build();
102129
}
103130

sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,28 @@
1111
import dev.restate.sdk.common.Serde;
1212
import dev.restate.sdk.common.Target;
1313
import java.net.http.HttpClient;
14+
import java.util.Collections;
15+
import java.util.Map;
1416

1517
public interface IngressClient {
16-
<Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req);
18+
<Req, Res> Res call(
19+
Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req, RequestOptions options);
1720

18-
<Req> String send(Target target, Serde<Req> reqSerde, Req req);
21+
default <Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req) {
22+
return call(target, reqSerde, resSerde, req, RequestOptions.DEFAULT);
23+
}
24+
25+
<Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOptions options);
26+
27+
default <Req> String send(Target target, Serde<Req> reqSerde, Req req) {
28+
return send(target, reqSerde, req, RequestOptions.DEFAULT);
29+
}
1930

2031
static IngressClient defaultClient(String baseUri) {
21-
return new DefaultIngressClient(HttpClient.newHttpClient(), baseUri);
32+
return defaultClient(baseUri, Collections.emptyMap());
33+
}
34+
35+
static IngressClient defaultClient(String baseUri, Map<String, String> headers) {
36+
return new DefaultIngressClient(HttpClient.newHttpClient(), baseUri, headers);
2237
}
2338
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate Java SDK,
4+
// which is released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
9+
package dev.restate.sdk.client;
10+
11+
import java.time.Duration;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
import java.util.Objects;
15+
16+
public class RequestOptions {
17+
18+
public static final RequestOptions DEFAULT = new RequestOptions();
19+
20+
private String idempotencyKey;
21+
private Duration idempotencyRetainPeriod;
22+
private final Map<String, String> additionalHeaders = new HashMap<>();
23+
24+
public RequestOptions withIdempotency(String idempotencyKey) {
25+
this.idempotencyKey = idempotencyKey;
26+
return this;
27+
}
28+
29+
public RequestOptions withIdempotency(String idempotencyKey, Duration idempotencyRetainPeriod) {
30+
this.idempotencyKey = idempotencyKey;
31+
this.idempotencyRetainPeriod = idempotencyRetainPeriod;
32+
return this;
33+
}
34+
35+
public RequestOptions withHeader(String name, String value) {
36+
this.additionalHeaders.put(name, value);
37+
return this;
38+
}
39+
40+
public RequestOptions withHeaders(Map<String, String> additionalHeaders) {
41+
this.additionalHeaders.putAll(additionalHeaders);
42+
return this;
43+
}
44+
45+
public String getIdempotencyKey() {
46+
return idempotencyKey;
47+
}
48+
49+
public Duration getIdempotencyRetainPeriod() {
50+
return idempotencyRetainPeriod;
51+
}
52+
53+
public Map<String, String> getAdditionalHeaders() {
54+
return additionalHeaders;
55+
}
56+
57+
@Override
58+
public boolean equals(Object o) {
59+
if (this == o) return true;
60+
if (o == null || getClass() != o.getClass()) return false;
61+
62+
RequestOptions that = (RequestOptions) o;
63+
64+
if (!Objects.equals(idempotencyKey, that.idempotencyKey)) return false;
65+
if (!Objects.equals(idempotencyRetainPeriod, that.idempotencyRetainPeriod)) return false;
66+
return Objects.equals(additionalHeaders, that.additionalHeaders);
67+
}
68+
69+
@Override
70+
public int hashCode() {
71+
int result = idempotencyKey != null ? idempotencyKey.hashCode() : 0;
72+
result =
73+
31 * result + (idempotencyRetainPeriod != null ? idempotencyRetainPeriod.hashCode() : 0);
74+
result = 31 * result + (additionalHeaders != null ? additionalHeaders.hashCode() : 0);
75+
return result;
76+
}
77+
78+
@Override
79+
public String toString() {
80+
return "RequestOptions{"
81+
+ "idempotencyKey='"
82+
+ idempotencyKey
83+
+ '\''
84+
+ ", idempotencyRetainPeriod="
85+
+ idempotencyRetainPeriod
86+
+ ", additionalHeaders="
87+
+ additionalHeaders
88+
+ '}';
89+
}
90+
}

sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import dev.restate.sdk.Awaitable;
1414
import dev.restate.sdk.Context;
1515
import dev.restate.sdk.client.IngressClient;
16+
import dev.restate.sdk.client.RequestOptions;
1617
import dev.restate.sdk.common.*;
1718
import dev.restate.sdk.workflow.WorkflowExecutionState;
1819
import dev.restate.sdk.workflow.generated.GetOutputResponse;
@@ -151,7 +152,8 @@ public static WorkflowExecutionState submit(
151152
Target.service(workflowName, "submit"),
152153
WorkflowImpl.INVOKE_REQUEST_SERDE,
153154
WorkflowImpl.WORKFLOW_EXECUTION_STATE_SERDE,
154-
InvokeRequest.fromAny(workflowKey, payload));
155+
InvokeRequest.fromAny(workflowKey, payload),
156+
RequestOptions.DEFAULT);
155157
}
156158

157159
public static <T> Optional<T> getOutput(
@@ -162,7 +164,8 @@ public static <T> Optional<T> getOutput(
162164
workflowManagerObjectName(workflowName), workflowKey, "getOutput"),
163165
CoreSerdes.VOID,
164166
WorkflowImpl.GET_OUTPUT_RESPONSE_SERDE,
165-
null);
167+
null,
168+
RequestOptions.DEFAULT);
166169
if (response.hasNotCompleted()) {
167170
return Optional.empty();
168171
}
@@ -181,7 +184,8 @@ public static boolean isCompleted(
181184
workflowManagerObjectName(workflowName), workflowKey, "getOutput"),
182185
CoreSerdes.VOID,
183186
WorkflowImpl.GET_OUTPUT_RESPONSE_SERDE,
184-
null);
187+
null,
188+
RequestOptions.DEFAULT);
185189
if (response.hasFailure()) {
186190
throw new TerminalException(
187191
response.getFailure().getCode(), response.getFailure().getMessage());
@@ -200,7 +204,8 @@ public static <T> T invokeShared(
200204
Target.service(workflowName, handlerName),
201205
WorkflowImpl.INVOKE_REQUEST_SERDE,
202206
resSerde,
203-
InvokeRequest.fromAny(workflowKey, payload));
207+
InvokeRequest.fromAny(workflowKey, payload),
208+
RequestOptions.DEFAULT);
204209
}
205210

206211
public static void invokeSharedSend(
@@ -212,7 +217,8 @@ public static void invokeSharedSend(
212217
ingressClient.send(
213218
Target.service(workflowName, handlerName),
214219
WorkflowImpl.INVOKE_REQUEST_SERDE,
215-
InvokeRequest.fromAny(workflowKey, payload));
220+
InvokeRequest.fromAny(workflowKey, payload),
221+
RequestOptions.DEFAULT);
216222
}
217223

218224
public static <T> Optional<T> getState(
@@ -223,7 +229,8 @@ public static <T> Optional<T> getState(
223229
workflowManagerObjectName(workflowName), workflowKey, "getState"),
224230
CoreSerdes.JSON_STRING,
225231
WorkflowImpl.GET_STATE_RESPONSE_SERDE,
226-
key.name());
232+
key.name(),
233+
RequestOptions.DEFAULT);
227234
if (response.hasEmpty()) {
228235
return Optional.empty();
229236
}

0 commit comments

Comments
 (0)