Skip to content

Commit 31f14f5

Browse files
API feedback (#481)
* Add interface for the builder options to make sure you don't get the builder terminal methods * Fix suspension logging * Add overloads in Endpoint builder factory methods * Changes to clients and Requests/Handlers class * Fix kotlinx serde API usage * Fix javadocs
1 parent 140247f commit 31f14f5

File tree

98 files changed

+1098
-1368
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+1098
-1368
lines changed

client-kotlin/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,6 @@ description = "Restate Client to interact with services from within other Kotlin
88
dependencies {
99
api(project(":client"))
1010
implementation(libs.kotlinx.coroutines.core)
11+
12+
runtimeOnly(project(":sdk-serde-kotlinx"))
1113
}

client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import dev.restate.client.ClientResponse
1414
import dev.restate.client.SendResponse
1515
import dev.restate.common.Output
1616
import dev.restate.common.Request
17+
import dev.restate.common.WorkflowRequest
1718
import dev.restate.serde.Serde
19+
import kotlin.time.Duration
20+
import kotlin.time.toJavaDuration
1821
import kotlinx.coroutines.future.await
1922

2023
// Extension methods for the Client
@@ -25,26 +28,46 @@ fun clientRequestOptions(init: ClientRequestOptions.Builder.() -> Unit): ClientR
2528
return builder.build()
2629
}
2730

31+
/** Shorthand for [callSuspend] */
32+
suspend fun <Req, Res> Request<Req, Res>.call(client: Client): ClientResponse<Res> {
33+
return client.callSuspend(this)
34+
}
35+
36+
/** Suspend version of [Client.callAsync] */
2837
suspend fun <Req, Res> Client.callSuspend(request: Request<Req, Res>): ClientResponse<Res> {
2938
return this.callAsync(request).await()
3039
}
3140

32-
suspend fun <Req, Res> Client.callSuspend(
33-
requestBuilder: Request.Builder<Req, Res>
34-
): ClientResponse<Res> {
35-
return this.callAsync(requestBuilder).await()
41+
/** Shorthand for [sendSuspend] */
42+
suspend fun <Req, Res> Request<Req, Res>.send(
43+
client: Client,
44+
delay: Duration? = null
45+
): ClientResponse<SendResponse<Res>> {
46+
return client.sendSuspend(this, delay)
3647
}
3748

49+
/** Suspend version of [Client.sendAsync] */
3850
suspend fun <Req, Res> Client.sendSuspend(
39-
request: Request<Req, Res>
51+
request: Request<Req, Res>,
52+
delay: Duration? = null
4053
): ClientResponse<SendResponse<Res>> {
41-
return this.sendAsync(request).await()
54+
return this.sendAsync(request, delay?.toJavaDuration()).await()
4255
}
4356

44-
suspend fun <Req, Res> Client.sendSuspend(
45-
request: Request.Builder<Req, Res>
57+
/** Shorthand for [submitSuspend] */
58+
suspend fun <Req, Res> WorkflowRequest<Req, Res>.submit(
59+
client: Client,
60+
delay: Duration? = null
61+
): ClientResponse<SendResponse<Res>> {
62+
return client.submitSuspend(this, delay)
63+
}
64+
65+
/** Suspend version of [Client.submitAsync] */
66+
suspend fun <Req, Res> Client.submitSuspend(
67+
request: WorkflowRequest<Req, Res>,
68+
delay: Duration? = null
4669
): ClientResponse<SendResponse<Res>> {
47-
return this.sendSuspend(request.build())
70+
return this.submitAsync(request, delay?.toJavaDuration()).await()
4871
}
4972

5073
suspend fun <T : Any> Client.AwakeableHandle.resolveSuspend(

client/build.gradle.kts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,5 @@ dependencies {
1515
implementation(libs.jackson.core)
1616
implementation(libs.log4j.api)
1717

18-
testImplementation(libs.junit.jupiter)
19-
testImplementation(libs.assertj)
18+
runtimeOnly(project(":sdk-serde-jackson"))
2019
}

client/src/main/java/dev/restate/client/Client.java

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,20 @@
1111
import dev.restate.common.Output;
1212
import dev.restate.common.Request;
1313
import dev.restate.common.Target;
14+
import dev.restate.common.WorkflowRequest;
1415
import dev.restate.serde.Serde;
1516
import dev.restate.serde.SerdeFactory;
1617
import dev.restate.serde.TypeTag;
18+
import java.time.Duration;
1719
import java.util.concurrent.CompletableFuture;
1820
import java.util.concurrent.CompletionException;
1921
import org.jspecify.annotations.NonNull;
22+
import org.jspecify.annotations.Nullable;
2023

2124
public interface Client {
2225

2326
<Req, Res> CompletableFuture<ClientResponse<Res>> callAsync(Request<Req, Res> request);
2427

25-
default <Req, Res> CompletableFuture<ClientResponse<Res>> callAsync(
26-
Request.Builder<Req, Res> request) {
27-
return callAsync(request.build());
28-
}
29-
3028
default <Req, Res> ClientResponse<Res> call(Request<Req, Res> request) throws IngressException {
3129
try {
3230
return callAsync(request).join();
@@ -38,18 +36,23 @@ default <Req, Res> ClientResponse<Res> call(Request<Req, Res> request) throws In
3836
}
3937
}
4038

41-
default <Req, Res> ClientResponse<Res> call(Request.Builder<Req, Res> request)
39+
default <Req, Res> CompletableFuture<ClientResponse<SendResponse<Res>>> sendAsync(
40+
Request<Req, Res> request) {
41+
return sendAsync(request, null);
42+
}
43+
44+
default <Req, Res> ClientResponse<SendResponse<Res>> send(Request<Req, Res> request)
4245
throws IngressException {
43-
return call(request.build());
46+
return send(request, null);
4447
}
4548

4649
<Req, Res> CompletableFuture<ClientResponse<SendResponse<Res>>> sendAsync(
47-
Request<Req, Res> request);
50+
Request<Req, Res> request, @Nullable Duration delay);
4851

49-
default <Req, Res> ClientResponse<SendResponse<Res>> send(Request<Req, Res> request)
50-
throws IngressException {
52+
default <Req, Res> ClientResponse<SendResponse<Res>> send(
53+
Request<Req, Res> request, @Nullable Duration delay) throws IngressException {
5154
try {
52-
return sendAsync(request).join();
55+
return sendAsync(request, delay).join();
5356
} catch (CompletionException e) {
5457
if (e.getCause() instanceof RuntimeException) {
5558
throw (RuntimeException) e.getCause();
@@ -58,14 +61,31 @@ default <Req, Res> ClientResponse<SendResponse<Res>> send(Request<Req, Res> requ
5861
}
5962
}
6063

61-
default <Req, Res> CompletableFuture<ClientResponse<SendResponse<Res>>> sendAsync(
62-
Request.Builder<Req, Res> request) {
63-
return sendAsync(request.build());
64+
default <Req, Res> CompletableFuture<ClientResponse<SendResponse<Res>>> submitAsync(
65+
WorkflowRequest<Req, Res> request) {
66+
return submitAsync(request, null);
6467
}
6568

66-
default <Req, Res> ClientResponse<SendResponse<Res>> send(Request.Builder<Req, Res> request)
69+
default <Req, Res> ClientResponse<SendResponse<Res>> submit(WorkflowRequest<Req, Res> request)
6770
throws IngressException {
68-
return send(request.build());
71+
return submit(request, null);
72+
}
73+
74+
default <Req, Res> CompletableFuture<ClientResponse<SendResponse<Res>>> submitAsync(
75+
WorkflowRequest<Req, Res> request, @Nullable Duration delay) {
76+
return sendAsync(request, delay);
77+
}
78+
79+
default <Req, Res> ClientResponse<SendResponse<Res>> submit(
80+
WorkflowRequest<Req, Res> request, @Nullable Duration delay) throws IngressException {
81+
try {
82+
return submitAsync(request, delay).join();
83+
} catch (CompletionException e) {
84+
if (e.getCause() instanceof RuntimeException) {
85+
throw (RuntimeException) e.getCause();
86+
}
87+
throw new RuntimeException(e.getCause());
88+
}
6989
}
7090

7191
/**

client/src/main/java/dev/restate/client/base/BaseClient.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -54,53 +54,55 @@ protected BaseClient(URI baseUri, SerdeFactory serdeFactory, ClientRequestOption
5454

5555
@Override
5656
public <Req, Res> CompletableFuture<ClientResponse<Res>> callAsync(Request<Req, Res> request) {
57-
Serde<Req> reqSerde = this.serdeFactory.create(request.requestTypeTag());
58-
Serde<Res> resSerde = this.serdeFactory.create(request.responseTypeTag());
57+
Serde<Req> reqSerde = this.serdeFactory.create(request.getRequestTypeTag());
58+
Serde<Res> resSerde = this.serdeFactory.create(request.getResponseTypeTag());
5959

60-
URI requestUri = toRequestURI(request.target(), false, null);
60+
URI requestUri = toRequestURI(request.getTarget(), false, null);
6161
Stream<Map.Entry<String, String>> headersStream =
6262
Stream.concat(
63-
baseOptions.headers().entrySet().stream(), request.headers().entrySet().stream());
63+
baseOptions.headers().entrySet().stream(),
64+
request.getHeaders() == null
65+
? Stream.empty()
66+
: request.getHeaders().entrySet().stream());
6467
if (reqSerde.contentType() != null) {
6568
headersStream =
6669
Stream.concat(
6770
headersStream, Stream.of(Map.entry("content-type", reqSerde.contentType())));
6871
}
69-
if (request.idempotencyKey() != null) {
72+
if (request.getIdempotencyKey() != null) {
7073
headersStream =
7174
Stream.concat(
72-
headersStream, Stream.of(Map.entry("idempotency-key", request.idempotencyKey())));
75+
headersStream, Stream.of(Map.entry("idempotency-key", request.getIdempotencyKey())));
7376
}
74-
Slice requestBody = reqSerde.serialize(request.request());
77+
Slice requestBody = reqSerde.serialize(request.getRequest());
7578

7679
return doPostRequest(
7780
requestUri, headersStream, requestBody, callResponseMapper("POST", requestUri, resSerde));
7881
}
7982

8083
@Override
8184
public <Req, Res> CompletableFuture<ClientResponse<SendResponse<Res>>> sendAsync(
82-
Request<Req, Res> request) {
83-
Serde<Req> reqSerde = this.serdeFactory.create(request.requestTypeTag());
84-
85-
URI requestUri =
86-
toRequestURI(
87-
request.target(),
88-
true,
89-
(request instanceof SendRequest<Req, Res> sendRequest) ? sendRequest.delay() : null);
85+
Request<Req, Res> request, @Nullable Duration delay) {
86+
Serde<Req> reqSerde = this.serdeFactory.create(request.getRequestTypeTag());
87+
88+
URI requestUri = toRequestURI(request.getTarget(), true, delay);
9089
Stream<Map.Entry<String, String>> headersStream =
9190
Stream.concat(
92-
baseOptions.headers().entrySet().stream(), request.headers().entrySet().stream());
91+
baseOptions.headers().entrySet().stream(),
92+
request.getHeaders() == null
93+
? Stream.empty()
94+
: request.getHeaders().entrySet().stream());
9395
if (reqSerde.contentType() != null) {
9496
headersStream =
9597
Stream.concat(
9698
headersStream, Stream.of(Map.entry("content-type", reqSerde.contentType())));
9799
}
98-
if (request.idempotencyKey() != null) {
100+
if (request.getIdempotencyKey() != null) {
99101
headersStream =
100102
Stream.concat(
101-
headersStream, Stream.of(Map.entry("idempotency-key", request.idempotencyKey())));
103+
headersStream, Stream.of(Map.entry("idempotency-key", request.getIdempotencyKey())));
102104
}
103-
Slice requestBody = reqSerde.serialize(request.request());
105+
Slice requestBody = reqSerde.serialize(request.getRequest());
104106

105107
return doPostRequest(
106108
requestUri,
@@ -157,7 +159,8 @@ public <Req, Res> CompletableFuture<ClientResponse<SendResponse<Res>>> sendAsync
157159
statusCode,
158160
responseHeaders,
159161
new SendResponse<>(
160-
status, invocationHandle(fields.get("invocationId"), request.responseTypeTag())));
162+
status,
163+
invocationHandle(fields.get("invocationId"), request.getResponseTypeTag())));
161164
});
162165
}
163166

@@ -427,7 +430,7 @@ private String targetToURI(Target target) {
427430
return builder.toString();
428431
}
429432

430-
private URI toRequestURI(Target target, boolean isSend, Duration delay) {
433+
private URI toRequestURI(Target target, boolean isSend, @Nullable Duration delay) {
431434
StringBuilder builder = new StringBuilder(targetToURI(target));
432435
if (isSend) {
433436
builder.append("/send");

0 commit comments

Comments
 (0)