Skip to content

Commit f8244e2

Browse files
Add support for Send delay in ingress client (#275)
1 parent ed651fd commit f8244e2

File tree

6 files changed

+60
-16
lines changed

6 files changed

+60
-16
lines changed

sdk-api-gen/src/main/resources/templates/Client.hbs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,21 @@ public class {{generatedClassSimpleName}} {
117117
}{{/handlers}}
118118

119119
public Send send() {
120-
return new Send();
120+
return new Send(null);
121+
}
122+
123+
public Send send(Duration delay) {
124+
return new Send(delay);
121125
}
122126

123127
public class Send {
128+
129+
private final Duration delay;
130+
131+
Send(Duration delay) {
132+
this.delay = delay;
133+
}
134+
124135
{{#handlers}}
125136
public String {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
126137
return this.{{name}}(
@@ -133,6 +144,7 @@ public class {{generatedClassSimpleName}} {
133144
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
134145
{{inputSerdeFieldName}},
135146
{{#if inputEmpty}}null{{else}}req{{/if}},
147+
this.delay,
136148
requestOptions);
137149
}
138150

@@ -147,6 +159,7 @@ public class {{generatedClassSimpleName}} {
147159
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
148160
{{inputSerdeFieldName}},
149161
{{#if inputEmpty}}null{{else}}req{{/if}},
162+
this.delay,
150163
requestOptions);
151164
}{{/handlers}}
152165
}

sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,18 @@ object {{generatedClassSimpleName}} {
6868
requestOptions);
6969
}{{/handlers}}
7070

71-
fun send(): Send {
72-
return Send()
71+
fun send(delay: Duration = Duration.ZERO): Send {
72+
return Send(delay)
7373
}
7474

75-
inner class Send {
75+
inner class Send(private val delay: Duration) {
7676
{{#handlers}}
7777
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): String {
7878
return this@IngressClient.ingressClient.sendSuspend(
7979
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this@IngressClient.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
8080
{{inputSerdeFieldName}},
8181
{{#if inputEmpty}}Unit{{else}}req{{/if}},
82+
delay,
8283
requestOptions);
8384
}{{/handlers}}
8485
}

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import dev.restate.sdk.client.IngressClient
1212
import dev.restate.sdk.client.RequestOptions
1313
import dev.restate.sdk.common.Serde
1414
import dev.restate.sdk.common.Target
15+
import kotlin.time.Duration
16+
import kotlin.time.toJavaDuration
1517
import kotlinx.coroutines.future.await
1618

1719
// Extension methods for the IngressClient
@@ -30,9 +32,10 @@ suspend fun <Req> IngressClient.sendSuspend(
3032
target: Target,
3133
reqSerde: Serde<Req>,
3234
req: Req,
35+
delay: Duration = Duration.ZERO,
3336
options: RequestOptions = RequestOptions.DEFAULT
3437
): String {
35-
return this.sendAsync(target, reqSerde, req, options).await()
38+
return this.sendAsync(target, reqSerde, req, delay.toJavaDuration(), options).await()
3639
}
3740

3841
suspend fun <T> IngressClient.AwakeableHandle.resolveSuspend(serde: Serde<T>, payload: T) {

sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.net.http.HttpClient;
2121
import java.net.http.HttpRequest;
2222
import java.net.http.HttpResponse;
23+
import java.time.Duration;
2324
import java.util.Map;
2425
import java.util.concurrent.CompletableFuture;
2526
import org.jspecify.annotations.NonNull;
@@ -45,7 +46,7 @@ public <Req, Res> CompletableFuture<Res> callAsync(
4546
Serde<Res> resSerde,
4647
Req req,
4748
RequestOptions requestOptions) {
48-
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, requestOptions);
49+
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, null, requestOptions);
4950
return httpClient
5051
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
5152
.handle(
@@ -69,8 +70,8 @@ public <Req, Res> CompletableFuture<Res> callAsync(
6970

7071
@Override
7172
public <Req> CompletableFuture<String> sendAsync(
72-
Target target, Serde<Req> reqSerde, Req req, RequestOptions options) {
73-
HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, options);
73+
Target target, Serde<Req> reqSerde, Req req, Duration delay, RequestOptions options) {
74+
HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, delay, options);
7475
return httpClient
7576
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
7677
.handle(
@@ -162,7 +163,7 @@ public CompletableFuture<Void> rejectAsync(String reason) {
162163
};
163164
}
164165

165-
private URI toRequestURI(Target target, boolean isSend) {
166+
private URI toRequestURI(Target target, boolean isSend, Duration delay) {
166167
StringBuilder builder = new StringBuilder();
167168
builder.append("/").append(target.getComponent());
168169
if (target.getKey() != null) {
@@ -172,13 +173,21 @@ private URI toRequestURI(Target target, boolean isSend) {
172173
if (isSend) {
173174
builder.append("/send");
174175
}
176+
if (delay != null && !delay.isZero() && !delay.isNegative()) {
177+
builder.append("?delay=").append(delay);
178+
}
175179

176180
return this.baseUri.resolve(builder.toString());
177181
}
178182

179183
private <Req> HttpRequest prepareHttpRequest(
180-
Target target, boolean isSend, Serde<Req> reqSerde, Req req, RequestOptions options) {
181-
var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend));
184+
Target target,
185+
boolean isSend,
186+
Serde<Req> reqSerde,
187+
Req req,
188+
Duration delay,
189+
RequestOptions options) {
190+
var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend, delay));
182191

183192
// Add content-type
184193
if (reqSerde.contentType() != null) {

sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
import dev.restate.sdk.common.Serde;
1212
import dev.restate.sdk.common.Target;
1313
import java.net.http.HttpClient;
14+
import java.time.Duration;
1415
import java.util.Collections;
1516
import java.util.Map;
1617
import java.util.concurrent.CompletableFuture;
1718
import java.util.concurrent.CompletionException;
1819
import org.jspecify.annotations.NonNull;
20+
import org.jspecify.annotations.Nullable;
1921

2022
public interface IngressClient {
2123

@@ -46,16 +48,26 @@ default <Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSe
4648
}
4749

4850
<Req> CompletableFuture<String> sendAsync(
49-
Target target, Serde<Req> reqSerde, Req req, RequestOptions options);
51+
Target target,
52+
Serde<Req> reqSerde,
53+
Req req,
54+
@Nullable Duration delay,
55+
RequestOptions options);
56+
57+
default <Req> CompletableFuture<String> sendAsync(
58+
Target target, Serde<Req> reqSerde, Req req, @Nullable Duration delay) {
59+
return sendAsync(target, reqSerde, req, delay, RequestOptions.DEFAULT);
60+
}
5061

5162
default <Req> CompletableFuture<String> sendAsync(Target target, Serde<Req> reqSerde, Req req) {
52-
return sendAsync(target, reqSerde, req, RequestOptions.DEFAULT);
63+
return sendAsync(target, reqSerde, req, null, RequestOptions.DEFAULT);
5364
}
5465

55-
default <Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOptions options)
66+
default <Req> String send(
67+
Target target, Serde<Req> reqSerde, Req req, @Nullable Duration delay, RequestOptions options)
5668
throws IngressException {
5769
try {
58-
return sendAsync(target, reqSerde, req, options).join();
70+
return sendAsync(target, reqSerde, req, delay, options).join();
5971
} catch (CompletionException e) {
6072
if (e.getCause() instanceof RuntimeException) {
6173
throw (RuntimeException) e.getCause();
@@ -64,8 +76,13 @@ default <Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOp
6476
}
6577
}
6678

79+
default <Req> String send(Target target, Serde<Req> reqSerde, Req req, @Nullable Duration delay)
80+
throws IngressException {
81+
return send(target, reqSerde, req, delay, RequestOptions.DEFAULT);
82+
}
83+
6784
default <Req> String send(Target target, Serde<Req> reqSerde, Req req) throws IngressException {
68-
return send(target, reqSerde, req, RequestOptions.DEFAULT);
85+
return send(target, reqSerde, req, null, RequestOptions.DEFAULT);
6986
}
7087

7188
/**

sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ public static void invokeSharedSend(
218218
Target.service(workflowName, handlerName),
219219
WorkflowImpl.INVOKE_REQUEST_SERDE,
220220
InvokeRequest.fromAny(workflowKey, payload),
221+
null,
221222
RequestOptions.DEFAULT);
222223
}
223224

0 commit comments

Comments
 (0)