Skip to content

Commit 58e83b2

Browse files
Client factory spi (#486)
* Add default Serde factory provider * Improve suspend overloads for kotlin clients * Spotless apply
1 parent a25c70a commit 58e83b2

File tree

9 files changed

+160
-15
lines changed

9 files changed

+160
-15
lines changed

client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import dev.restate.client.SendResponse
1515
import dev.restate.common.Output
1616
import dev.restate.common.Request
1717
import dev.restate.common.WorkflowRequest
18-
import dev.restate.serde.Serde
18+
import dev.restate.serde.TypeTag
19+
import dev.restate.serde.kotlinx.typeTag
1920
import kotlin.time.Duration
2021
import kotlin.time.toJavaDuration
2122
import kotlinx.coroutines.future.await
@@ -71,11 +72,18 @@ suspend fun <Req, Res> Client.submitSuspend(
7172
}
7273

7374
suspend fun <T : Any> Client.AwakeableHandle.resolveSuspend(
74-
serde: Serde<T>,
75+
typeTag: TypeTag<T>,
7576
payload: T,
7677
options: ClientRequestOptions = ClientRequestOptions.DEFAULT
7778
): ClientResponse<Void> {
78-
return this.resolveAsync(serde, payload, options).await()
79+
return this.resolveAsync(typeTag, payload, options).await()
80+
}
81+
82+
suspend inline fun <reified T : Any> Client.AwakeableHandle.resolveSuspend(
83+
payload: T,
84+
options: ClientRequestOptions = ClientRequestOptions.DEFAULT
85+
): ClientResponse<Void> {
86+
return this.resolveSuspend(typeTag<T>(), payload, options)
7987
}
8088

8189
suspend fun Client.AwakeableHandle.rejectSuspend(

client/src/main/java/dev/restate/client/Client.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import dev.restate.common.Request;
1313
import dev.restate.common.Target;
1414
import dev.restate.common.WorkflowRequest;
15-
import dev.restate.serde.Serde;
1615
import dev.restate.serde.SerdeFactory;
1716
import dev.restate.serde.TypeTag;
1817
import java.time.Duration;
@@ -123,6 +122,34 @@ default <T> ClientResponse<Void> resolve(
123122
}
124123
}
125124

125+
/**
126+
* Complete with success the Awakeable.
127+
*
128+
* @param clazz used to serialize the Awakeable result payload.
129+
* @param payload the result payload. MUST NOT be null.
130+
*/
131+
default <T> ClientResponse<Void> resolve(Class<T> clazz, @NonNull T payload) {
132+
return this.resolve(TypeTag.of(clazz), payload, ClientRequestOptions.DEFAULT);
133+
}
134+
135+
/** Same as {@link #resolve(Class, Object)} but async with options. */
136+
default <T> CompletableFuture<ClientResponse<Void>> resolveAsync(
137+
Class<T> clazz, @NonNull T payload, ClientRequestOptions options) {
138+
return this.resolveAsync(TypeTag.of(clazz), payload, options);
139+
}
140+
141+
/** Same as {@link #resolve(TypeTag, Object)} but async. */
142+
default <T> CompletableFuture<ClientResponse<Void>> resolveAsync(
143+
Class<T> clazz, @NonNull T payload) {
144+
return resolveAsync(TypeTag.of(clazz), payload, ClientRequestOptions.DEFAULT);
145+
}
146+
147+
/** Same as {@link #resolve(TypeTag, Object)} with options. */
148+
default <T> ClientResponse<Void> resolve(
149+
Class<T> clazz, @NonNull T payload, ClientRequestOptions options) {
150+
return resolve(TypeTag.of(clazz), payload, options);
151+
}
152+
126153
/**
127154
* Complete with success the Awakeable.
128155
*
@@ -331,7 +358,7 @@ default ClientResponse<Output<Res>> getOutput() throws IngressException {
331358
* @param baseUri uri to connect to.
332359
*/
333360
static Client connect(String baseUri) {
334-
return connect(baseUri, SerdeFactory.NOOP, ClientRequestOptions.DEFAULT);
361+
return connect(baseUri, null, null);
335362
}
336363

337364
/**
@@ -341,15 +368,14 @@ static Client connect(String baseUri) {
341368
* @param options default options to use in all the requests.
342369
*/
343370
static Client connect(String baseUri, ClientRequestOptions options) {
344-
return connect(baseUri, SerdeFactory.NOOP, options);
371+
return connect(baseUri, null, options);
345372
}
346373

347374
/**
348375
* Create a default JDK client.
349376
*
350377
* @param baseUri uri to connect to
351-
* @param serdeFactory Serde factory to use. You must provide this when the provided {@link
352-
* TypeTag} are not {@link Serde} instances. If you're just wrapping this client in a
378+
* @param serdeFactory Serde factory to use. If you're just wrapping this client in a
353379
* code-generated client, you don't need to provide this parameter.
354380
*/
355381
static Client connect(String baseUri, SerdeFactory serdeFactory) {
@@ -360,8 +386,7 @@ static Client connect(String baseUri, SerdeFactory serdeFactory) {
360386
* Create a default JDK client.
361387
*
362388
* @param baseUri uri to connect to
363-
* @param serdeFactory Serde factory to use. You must provide this when the provided {@link
364-
* TypeTag} are not {@link Serde} instances. If you're just wrapping this client in a
389+
* @param serdeFactory Serde factory to use. If you're just wrapping this client in a
365390
* code-generated client, you don't need to provide this parameter.
366391
* @param options default options to use in all the requests.
367392
*/

client/src/main/java/dev/restate/client/base/BaseClient.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import dev.restate.common.*;
1616
import dev.restate.serde.Serde;
1717
import dev.restate.serde.SerdeFactory;
18+
import dev.restate.serde.TypeRef;
1819
import dev.restate.serde.TypeTag;
20+
import dev.restate.serde.provider.DefaultSerdeFactoryProvider;
1921
import java.io.ByteArrayInputStream;
2022
import java.io.IOException;
2123
import java.io.InputStream;
@@ -26,6 +28,8 @@
2628
import java.util.*;
2729
import java.util.concurrent.CompletableFuture;
2830
import java.util.stream.Stream;
31+
import org.apache.logging.log4j.LogManager;
32+
import org.apache.logging.log4j.Logger;
2933
import org.jetbrains.annotations.NotNull;
3034
import org.jspecify.annotations.NonNull;
3135
import org.jspecify.annotations.Nullable;
@@ -42,13 +46,19 @@ public abstract class BaseClient implements Client {
4246
private final SerdeFactory serdeFactory;
4347
private final ClientRequestOptions baseOptions;
4448

45-
protected BaseClient(URI baseUri, SerdeFactory serdeFactory, ClientRequestOptions baseOptions) {
49+
protected BaseClient(
50+
URI baseUri,
51+
@Nullable SerdeFactory serdeFactory,
52+
@Nullable ClientRequestOptions baseOptions) {
4653
this.baseUri = Objects.requireNonNull(baseUri, "Base uri cannot be null");
4754
if (!this.baseUri.isAbsolute()) {
4855
throw new IllegalArgumentException(
4956
"The base uri " + baseUri + " is not absolute. This is not supported.");
5057
}
51-
this.serdeFactory = serdeFactory == null ? SerdeFactory.NOOP : serdeFactory;
58+
this.serdeFactory =
59+
serdeFactory == null
60+
? DefaultSerdeFactorySingleton.INSTANCE.getLoadedFactory()
61+
: serdeFactory;
5262
this.baseOptions = baseOptions == null ? ClientRequestOptions.DEFAULT : baseOptions;
5363
}
5464

@@ -539,4 +549,50 @@ private static Map<String, String> findStringFieldsInJsonObject(
539549

540550
return resultMap;
541551
}
552+
553+
// Machinery to load default serde factory
554+
555+
private static class DefaultSerdeFactorySingleton {
556+
private static final DefaultSerdeFactory INSTANCE = new DefaultSerdeFactory();
557+
}
558+
559+
public static final class DefaultSerdeFactory {
560+
561+
private static final Logger LOG = LogManager.getLogger(DefaultSerdeFactory.class);
562+
563+
private final SerdeFactory loadedFactory;
564+
565+
public DefaultSerdeFactory() {
566+
var loadedFactories = ServiceLoader.load(DefaultSerdeFactoryProvider.class).stream().toList();
567+
if (loadedFactories.size() == 1) {
568+
this.loadedFactory = loadedFactories.get(0).get().create();
569+
} else {
570+
this.loadedFactory =
571+
new SerdeFactory() {
572+
@Override
573+
public <T> Serde<T> create(TypeRef<T> typeRef) {
574+
throw new UnsupportedOperationException(
575+
"No SerdeFactory class was configured. Please configure one, this is required when using TypeTag and Class in client methods.");
576+
}
577+
578+
@Override
579+
public <T> Serde<T> create(Class<T> clazz) {
580+
throw new UnsupportedOperationException(
581+
"No SerdeFactory class was configured. Please configure one, this is required when using TypeTag and Class in client methods.");
582+
}
583+
};
584+
}
585+
586+
if (loadedFactories.size() > 1) {
587+
LOG.warn(
588+
"When creating the Client, more than one SerdeFactory was found.\n"
589+
+ "To prevent unexpected behavior, the client was configured without SerdeFactory. "
590+
+ "Please manually provide the SerdeFactory (e.g. JacksonSerdeFactory or KotlinxSerializationSerdeFactory) of your choice in the Client.connect() factory methods.");
591+
}
592+
}
593+
594+
public SerdeFactory getLoadedFactory() {
595+
return loadedFactory;
596+
}
597+
}
542598
}

client/src/main/java/dev/restate/client/jdk/JdkClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,14 @@ public Map<String, String> toLowercaseMap() {
116116
public static JdkClient of(
117117
HttpClient httpClient,
118118
String baseUri,
119-
SerdeFactory serdeFactory,
120-
ClientRequestOptions options) {
119+
@Nullable SerdeFactory serdeFactory,
120+
@Nullable ClientRequestOptions options) {
121121
return new JdkClient(URI.create(baseUri), serdeFactory, options, httpClient);
122122
}
123123

124124
/** Create a new JDK Client */
125125
public static JdkClient of(
126-
String baseUri, SerdeFactory serdeFactory, ClientRequestOptions options) {
126+
String baseUri, @Nullable SerdeFactory serdeFactory, @Nullable ClientRequestOptions options) {
127127
return new JdkClient(URI.create(baseUri), serdeFactory, options, HttpClient.newHttpClient());
128128
}
129129
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate Java SDK,
4+
// which is released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
9+
package dev.restate.serde.provider;
10+
11+
import dev.restate.serde.SerdeFactory;
12+
13+
/**
14+
* This class is used to autoload either JacksonSerdeFactory or KotlinxSerializationSerdeFactory in
15+
* the client.
16+
*/
17+
public interface DefaultSerdeFactoryProvider {
18+
19+
SerdeFactory create();
20+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate Java SDK,
4+
// which is released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
9+
package dev.restate.serde.jackson;
10+
11+
import dev.restate.serde.SerdeFactory;
12+
import dev.restate.serde.provider.DefaultSerdeFactoryProvider;
13+
14+
public class JacksonSerdeFactoryProvider implements DefaultSerdeFactoryProvider {
15+
@Override
16+
public SerdeFactory create() {
17+
return new JacksonSerdeFactory();
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
dev.restate.serde.jackson.JacksonSerdeFactoryProvider
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate Java SDK,
4+
// which is released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
9+
package dev.restate.serde.kotlinx
10+
11+
import dev.restate.serde.provider.DefaultSerdeFactoryProvider
12+
13+
public class KotlinSerializationSerdeFactoryProvider : DefaultSerdeFactoryProvider {
14+
override fun create() = KotlinSerializationSerdeFactory()
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
dev.restate.serde.kotlinx.KotlinSerializationSerdeFactoryProvider

0 commit comments

Comments
 (0)