Skip to content

Commit 4205690

Browse files
Make the ingress client async. Fix #247 (#252)
1 parent 3c69539 commit 4205690

File tree

6 files changed

+207
-57
lines changed

6 files changed

+207
-57
lines changed

sdk-api-gen/src/main/resources/templates/Client.hbs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,21 @@ public class {{generatedClassSimpleName}} {
109109
{{outputSerdeFieldName}},
110110
{{#if inputEmpty}}null{{else}}req{{/if}},
111111
requestOptions);
112+
}
113+
114+
public {{#if outputEmpty}}java.util.concurrent.CompletableFuture<Void>{{else}}java.util.concurrent.CompletableFuture<{{{boxedOutputFqcn}}}>{{/if}} {{name}}Async({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
115+
return this.{{name}}Async(
116+
{{^inputEmpty}}req, {{/inputEmpty}}
117+
dev.restate.sdk.client.RequestOptions.DEFAULT);
118+
}
119+
120+
public {{#if outputEmpty}}java.util.concurrent.CompletableFuture<Void>{{else}}java.util.concurrent.CompletableFuture<{{{boxedOutputFqcn}}}>{{/if}} {{name}}Async({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
121+
return this.ingressClient.callAsync(
122+
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
123+
{{inputSerdeFieldName}},
124+
{{outputSerdeFieldName}},
125+
{{#if inputEmpty}}null{{else}}req{{/if}},
126+
requestOptions);
112127
}{{/handlers}}
113128

114129
public Send send() {
@@ -129,6 +144,20 @@ public class {{generatedClassSimpleName}} {
129144
{{inputSerdeFieldName}},
130145
{{#if inputEmpty}}null{{else}}req{{/if}},
131146
requestOptions);
147+
}
148+
149+
public java.util.concurrent.CompletableFuture<String> {{name}}Async({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
150+
return this.{{name}}Async(
151+
{{^inputEmpty}}req, {{/inputEmpty}}
152+
dev.restate.sdk.client.RequestOptions.DEFAULT);
153+
}
154+
155+
public java.util.concurrent.CompletableFuture<String> {{name}}Async({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
156+
return IngressClient.this.ingressClient.sendAsync(
157+
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
158+
{{inputSerdeFieldName}},
159+
{{#if inputEmpty}}null{{else}}req{{/if}},
160+
requestOptions);
132161
}{{/handlers}}
133162
}
134163
}

sdk-api-kotlin-gen/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies {
2121
testImplementation(testingLibs.assertj)
2222
testImplementation(coreLibs.protobuf.java)
2323
testImplementation(coreLibs.log4j.core)
24+
testImplementation(kotlinLibs.kotlinx.coroutines)
2425

2526
// Import test suites from sdk-core
2627
testImplementation(project(":sdk-core", "testArchive"))

sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import dev.restate.sdk.common.StateKey
66
import dev.restate.sdk.common.Serde
77
import dev.restate.sdk.common.Target
88
import kotlin.time.Duration
9+
import kotlinx.coroutines.future.await
910

1011
object {{generatedClassSimpleName}} {
1112

@@ -73,12 +74,12 @@ object {{generatedClassSimpleName}} {
7374

7475
{{#handlers}}
7576
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): {{{boxedOutputFqcn}}} {
76-
return this.ingressClient.call(
77+
return this.ingressClient.callAsync(
7778
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
7879
{{inputSerdeFieldName}},
7980
{{outputSerdeFieldName}},
8081
{{#if inputEmpty}}null{{else}}req{{/if}},
81-
requestOptions);
82+
requestOptions).await();
8283
}{{/handlers}}
8384

8485
fun send(): Send {
@@ -88,11 +89,11 @@ object {{generatedClassSimpleName}} {
8889
inner class Send {
8990
{{#handlers}}
9091
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): String {
91-
return this@IngressClient.ingressClient.send(
92+
return this@IngressClient.ingressClient.sendAsync(
9293
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this@IngressClient.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
9394
{{inputSerdeFieldName}},
9495
{{#if inputEmpty}}null{{else}}req{{/if}},
95-
requestOptions);
96+
requestOptions).await();
9697
}{{/handlers}}
9798
}
9899
}

sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java

Lines changed: 75 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@
1313
import com.fasterxml.jackson.core.JsonToken;
1414
import dev.restate.sdk.common.Serde;
1515
import dev.restate.sdk.common.Target;
16+
import java.io.ByteArrayInputStream;
1617
import java.io.IOException;
1718
import java.io.InputStream;
1819
import java.net.URI;
1920
import java.net.http.HttpClient;
2021
import java.net.http.HttpRequest;
2122
import java.net.http.HttpResponse;
22-
import java.nio.charset.StandardCharsets;
2323
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
2425

2526
public class DefaultIngressClient implements IngressClient {
2627

@@ -37,53 +38,58 @@ public DefaultIngressClient(HttpClient httpClient, String baseUri, Map<String, S
3738
}
3839

3940
@Override
40-
public <Req, Res> Res call(
41+
public <Req, Res> CompletableFuture<Res> callAsync(
4142
Target target,
4243
Serde<Req> reqSerde,
4344
Serde<Res> resSerde,
4445
Req req,
4546
RequestOptions requestOptions) {
4647
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, requestOptions);
47-
HttpResponse<byte[]> response;
48-
try {
49-
response = httpClient.send(request, HttpResponse.BodyHandlers.ofByteArray());
50-
} catch (IOException | InterruptedException e) {
51-
throw new RuntimeException("Error when executing the request", e);
52-
}
53-
54-
if (response.statusCode() != 200) {
55-
// Try to parse as string
56-
String error = new String(response.body(), StandardCharsets.UTF_8);
57-
throw new RuntimeException(
58-
"Received non OK status code: " + response.statusCode() + ". Body: " + error);
59-
}
60-
61-
return resSerde.deserialize(response.body());
48+
return httpClient
49+
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
50+
.handle(
51+
(response, throwable) -> {
52+
if (throwable != null) {
53+
throw new IngressException("Error when executing the request", throwable);
54+
}
55+
56+
if (response.statusCode() >= 300) {
57+
handleNonSuccessResponse(response);
58+
}
59+
60+
try {
61+
return resSerde.deserialize(response.body());
62+
} catch (Exception e) {
63+
throw new IngressException(
64+
"Cannot deserialize the response", response.statusCode(), response.body(), e);
65+
}
66+
});
6267
}
6368

6469
@Override
65-
public <Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOptions options) {
70+
public <Req> CompletableFuture<String> sendAsync(
71+
Target target, Serde<Req> reqSerde, Req req, RequestOptions options) {
6672
HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, options);
67-
HttpResponse<InputStream> response;
68-
try {
69-
response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
70-
} catch (IOException | InterruptedException e) {
71-
throw new RuntimeException("Error when executing the request", e);
72-
}
73-
74-
try (InputStream in = response.body()) {
75-
if (response.statusCode() >= 300) {
76-
// Try to parse as string
77-
String error = new String(in.readAllBytes(), StandardCharsets.UTF_8);
78-
throw new RuntimeException(
79-
"Received non OK status code: " + response.statusCode() + ". Body: " + error);
80-
}
81-
return deserializeInvocationId(in);
82-
} catch (IOException e) {
83-
throw new RuntimeException(
84-
"Error when trying to read the response, when status code was " + response.statusCode(),
85-
e);
86-
}
73+
return httpClient
74+
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
75+
.handle(
76+
(response, throwable) -> {
77+
if (throwable != null) {
78+
throw new IngressException("Error when executing the request", throwable);
79+
}
80+
81+
if (response.statusCode() >= 300) {
82+
handleNonSuccessResponse(response);
83+
}
84+
85+
try {
86+
return findStringFieldInJsonObject(
87+
new ByteArrayInputStream(response.body()), "invocationId");
88+
} catch (Exception e) {
89+
throw new IngressException(
90+
"Cannot deserialize the response", response.statusCode(), response.body(), e);
91+
}
92+
});
8793
}
8894

8995
private URI toRequestURI(Target target, boolean isSend) {
@@ -128,23 +134,43 @@ private <Req> HttpRequest prepareHttpRequest(
128134
return reqBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(reqSerde.serialize(req))).build();
129135
}
130136

131-
private static String deserializeInvocationId(InputStream body) throws IOException {
137+
private void handleNonSuccessResponse(HttpResponse<byte[]> response) {
138+
if (response.headers().firstValue("content-type").orElse("").contains("application/json")) {
139+
String errorMessage;
140+
// Let's try to parse the message field
141+
try {
142+
errorMessage =
143+
findStringFieldInJsonObject(new ByteArrayInputStream(response.body()), "message");
144+
} catch (Exception e) {
145+
throw new IngressException(
146+
"Can't decode error response from ingress", response.statusCode(), response.body(), e);
147+
}
148+
throw new IngressException(errorMessage, response.statusCode(), response.body());
149+
}
150+
151+
// Fallback error
152+
throw new IngressException(
153+
"Received non success status code", response.statusCode(), response.body());
154+
}
155+
156+
private static String findStringFieldInJsonObject(InputStream body, String fieldName)
157+
throws IOException {
132158
try (JsonParser parser = JSON_FACTORY.createParser(body)) {
133159
if (parser.nextToken() != JsonToken.START_OBJECT) {
134160
throw new IllegalStateException(
135161
"Expecting token " + JsonToken.START_OBJECT + ", got " + parser.getCurrentToken());
136162
}
137-
String fieldName = parser.nextFieldName();
138-
if (fieldName == null || !fieldName.equalsIgnoreCase("invocationid")) {
139-
throw new IllegalStateException(
140-
"Expecting token \"invocationId\", got " + parser.getCurrentToken());
141-
}
142-
String invocationId = parser.nextTextValue();
143-
if (invocationId == null) {
144-
throw new IllegalStateException(
145-
"Expecting token " + JsonToken.VALUE_STRING + ", got " + parser.getCurrentToken());
163+
for (String actualFieldName = parser.nextFieldName();
164+
actualFieldName != null;
165+
actualFieldName = parser.nextFieldName()) {
166+
if (actualFieldName.equalsIgnoreCase(fieldName)) {
167+
return parser.nextTextValue();
168+
} else {
169+
parser.nextValue();
170+
}
146171
}
147-
return invocationId;
172+
throw new IllegalStateException(
173+
"Expecting field name \"" + fieldName + "\", got " + parser.getCurrentToken());
148174
}
149175
}
150176
}

sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,57 @@
1313
import java.net.http.HttpClient;
1414
import java.util.Collections;
1515
import java.util.Map;
16+
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.CompletionException;
1618

1719
public interface IngressClient {
18-
<Req, Res> Res call(
20+
21+
<Req, Res> CompletableFuture<Res> callAsync(
1922
Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req, RequestOptions options);
2023

21-
default <Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req) {
24+
default <Req, Res> CompletableFuture<Res> callAsync(
25+
Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req) {
26+
return callAsync(target, reqSerde, resSerde, req, RequestOptions.DEFAULT);
27+
}
28+
29+
default <Req, Res> Res call(
30+
Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req, RequestOptions options)
31+
throws IngressException {
32+
try {
33+
return callAsync(target, reqSerde, resSerde, req, options).join();
34+
} catch (CompletionException e) {
35+
if (e.getCause() instanceof RuntimeException) {
36+
throw (RuntimeException) e.getCause();
37+
}
38+
throw new RuntimeException(e.getCause());
39+
}
40+
}
41+
42+
default <Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req)
43+
throws IngressException {
2244
return call(target, reqSerde, resSerde, req, RequestOptions.DEFAULT);
2345
}
2446

25-
<Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOptions options);
47+
<Req> CompletableFuture<String> sendAsync(
48+
Target target, Serde<Req> reqSerde, Req req, RequestOptions options);
49+
50+
default <Req> CompletableFuture<String> sendAsync(Target target, Serde<Req> reqSerde, Req req) {
51+
return sendAsync(target, reqSerde, req, RequestOptions.DEFAULT);
52+
}
53+
54+
default <Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOptions options)
55+
throws IngressException {
56+
try {
57+
return sendAsync(target, reqSerde, req, options).join();
58+
} catch (CompletionException e) {
59+
if (e.getCause() instanceof RuntimeException) {
60+
throw (RuntimeException) e.getCause();
61+
}
62+
throw new RuntimeException(e.getCause());
63+
}
64+
}
2665

27-
default <Req> String send(Target target, Serde<Req> reqSerde, Req req) {
66+
default <Req> String send(Target target, Serde<Req> reqSerde, Req req) throws IngressException {
2867
return send(target, reqSerde, req, RequestOptions.DEFAULT);
2968
}
3069

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate Java SDK,
4+
// which is released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
9+
package dev.restate.sdk.client;
10+
11+
import java.nio.charset.StandardCharsets;
12+
import org.jspecify.annotations.Nullable;
13+
14+
public class IngressException extends RuntimeException {
15+
16+
private final int statusCode;
17+
private final byte[] responseBody;
18+
19+
public IngressException(String message, Throwable cause) {
20+
this(message, -1, null, cause);
21+
}
22+
23+
public IngressException(String message, int statusCode, byte[] responseBody) {
24+
this(message, statusCode, responseBody, null);
25+
}
26+
27+
public IngressException(String message, int statusCode, byte[] responseBody, Throwable cause) {
28+
super(message, cause);
29+
this.statusCode = statusCode;
30+
this.responseBody = responseBody;
31+
}
32+
33+
public int getStatusCode() {
34+
return statusCode;
35+
}
36+
37+
public byte @Nullable [] getResponseBody() {
38+
return responseBody;
39+
}
40+
41+
@Override
42+
public String toString() {
43+
return "IngressException{"
44+
+ "statusCode="
45+
+ statusCode
46+
+ ", responseBody='"
47+
+ new String(responseBody, StandardCharsets.UTF_8)
48+
+ '\''
49+
+ ", message='"
50+
+ this.getMessage()
51+
+ '\''
52+
+ '}';
53+
}
54+
}

0 commit comments

Comments
 (0)