Skip to content

Commit 4657365

Browse files
Support named entries, and allow users to set side effects name.
This commit adds support in the state machine to propagate additional entry info about "failing entries" in the EndMessage, and allows the user to set the name for side effect entries.
1 parent 4868e29 commit 4657365

File tree

18 files changed

+333
-104
lines changed

18 files changed

+333
-104
lines changed

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,16 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
130130
}
131131
}
132132

133-
override suspend fun <T : Any?> runBlock(serde: Serde<T>, block: suspend () -> T): T {
133+
override suspend fun <T : Any?> runBlock(
134+
serde: Serde<T>,
135+
name: String,
136+
block: suspend () -> T
137+
): T {
134138
val exitResult =
135139
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteString>>
136140
->
137141
syscalls.enterSideEffectBlock(
142+
name,
138143
object : EnterSideEffectSyscallCallback {
139144
override fun onSuccess(t: ByteString?) {
140145
val deferred: CompletableDeferred<ByteString> = CompletableDeferred()

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ sealed interface Context {
106106
* suspension point) without re-executing the closure. Use this feature if you want to perform
107107
* <b>non-deterministic operations</b>.
108108
*
109+
* You can name this closure using the `name` parameter. This name will be available in the
110+
* observability tools.
111+
*
109112
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
110113
* times until it records a result.
111114
*
@@ -138,11 +141,12 @@ sealed interface Context {
138141
* To propagate failures to the run call-site, make sure to wrap them in [TerminalException].
139142
*
140143
* @param serde the type tag of the return value, used to serialize/deserialize it.
144+
* @param name the name of the side effect.
141145
* @param block closure to execute.
142146
* @param T type of the return value.
143147
* @return value of the runBlock operation.
144148
*/
145-
suspend fun <T : Any?> runBlock(serde: Serde<T>, block: suspend () -> T): T
149+
suspend fun <T : Any?> runBlock(serde: Serde<T>, name: String = "", block: suspend () -> T): T
146150

147151
/**
148152
* Create an [Awakeable], addressable through [Awakeable.id].
@@ -221,8 +225,11 @@ sealed interface Context {
221225
* @param T type of the return value.
222226
* @return value of the runBlock operation.
223227
*/
224-
suspend inline fun <reified T : Any> Context.runBlock(noinline block: suspend () -> T): T {
225-
return this.runBlock(KtSerdes.json(), block)
228+
suspend inline fun <reified T : Any> Context.runBlock(
229+
name: String = "",
230+
noinline block: suspend () -> T
231+
): T {
232+
return this.runBlock(KtSerdes.json(), name, block)
226233
}
227234

228235
/**

sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ class SideEffectTest : SideEffectTestSuite() {
2626
"Hello $result"
2727
}
2828

29+
override fun namedSideEffect(name: String, sideEffectOutput: String): TestInvocationBuilder =
30+
testDefinitionForService("SideEffect") { ctx, _: Unit ->
31+
val result = ctx.runBlock(name) { sideEffectOutput }
32+
"Hello $result"
33+
}
34+
2935
override fun consecutiveSideEffect(sideEffectOutput: String): TestInvocationBuilder =
3036
testDefinitionForService("ConsecutiveSideEffect") { ctx, _: Unit ->
3137
val firstResult = ctx.runBlock { sideEffectOutput }
@@ -54,4 +60,9 @@ class SideEffectTest : SideEffectTestSuite() {
5460
ctx.runBlock { ctx.send(GREETER_SERVICE_TARGET, KtSerdes.json(), "something") }
5561
throw IllegalStateException("This point should not be reached")
5662
}
63+
64+
override fun failingSideEffect(name: String, reason: String): TestInvocationBuilder =
65+
testDefinitionForService<Unit, String>("FailingSideEffect") { ctx, _: Unit ->
66+
ctx.runBlock(name) { throw IllegalStateException(reason) }
67+
}
5768
}

sdk-api/src/main/java/dev/restate/sdk/Context.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ default void sleep(Duration duration) {
100100
* suspension point) without re-executing the closure. Use this feature if you want to perform
101101
* <b>non-deterministic operations</b>.
102102
*
103+
* <p>You can name this closure using the {@code name} parameter. This name will be available in
104+
* the observability tools.
105+
*
103106
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
104107
* times until it records a result.
105108
*
@@ -133,23 +136,35 @@ default void sleep(Duration duration) {
133136
* To propagate run failures to the call-site, make sure to wrap them in {@link
134137
* TerminalException}.
135138
*
139+
* @param name name of the side effect.
136140
* @param serde the type tag of the return value, used to serialize/deserialize it.
137141
* @param action closure to execute.
138142
* @param <T> type of the return value.
139143
* @return value of the run operation.
140144
*/
141-
<T> T run(Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException;
145+
<T> T run(String name, Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException;
142146

143-
/** Like {@link #run(Serde, ThrowingSupplier)}, but without returning a value. */
144-
default void run(ThrowingRunnable runnable) throws TerminalException {
147+
/** Like {@link #run(String, Serde, ThrowingSupplier)}, but without returning a value. */
148+
default void run(String name, ThrowingRunnable runnable) throws TerminalException {
145149
run(
150+
name,
146151
CoreSerdes.VOID,
147152
() -> {
148153
runnable.run();
149154
return null;
150155
});
151156
}
152157

158+
/** Like {@link #run(String, Serde, ThrowingSupplier)}, but without a name. */
159+
default <T> T run(Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException {
160+
return run(null, serde, action);
161+
}
162+
163+
/** Like {@link #run(String, ThrowingRunnable)}, but without a name. */
164+
default void run(ThrowingRunnable runnable) throws TerminalException {
165+
run(null, runnable);
166+
}
167+
153168
/**
154169
* Create an {@link Awakeable}, addressable through {@link Awakeable#id()}.
155170
*

sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,10 @@ public <T> void send(Target target, Serde<T> inputSerde, T parameter, Duration d
110110
}
111111

112112
@Override
113-
public <T> T run(Serde<T> serde, ThrowingSupplier<T> action) {
113+
public <T> T run(String name, Serde<T> serde, ThrowingSupplier<T> action) {
114114
CompletableFuture<CompletableFuture<ByteString>> enterFut = new CompletableFuture<>();
115115
syscalls.enterSideEffectBlock(
116+
name,
116117
new EnterSideEffectSyscallCallback() {
117118
@Override
118119
public void onNotExecuted() {

sdk-api/src/test/java/dev/restate/sdk/SideEffectTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
public class SideEffectTest extends SideEffectTestSuite {
2020

21+
@Override
2122
protected TestInvocationBuilder sideEffect(String sideEffectOutput) {
2223
return testDefinitionForService(
2324
"SideEffect",
@@ -29,6 +30,19 @@ protected TestInvocationBuilder sideEffect(String sideEffectOutput) {
2930
});
3031
}
3132

33+
@Override
34+
protected TestInvocationBuilder namedSideEffect(String name, String sideEffectOutput) {
35+
return testDefinitionForService(
36+
"SideEffect",
37+
CoreSerdes.VOID,
38+
CoreSerdes.JSON_STRING,
39+
(ctx, unused) -> {
40+
String result = ctx.run(name, CoreSerdes.JSON_STRING, () -> sideEffectOutput);
41+
return "Hello " + result;
42+
});
43+
}
44+
45+
@Override
3246
protected TestInvocationBuilder consecutiveSideEffect(String sideEffectOutput) {
3347
return testDefinitionForService(
3448
"ConsecutiveSideEffect",
@@ -42,6 +56,7 @@ protected TestInvocationBuilder consecutiveSideEffect(String sideEffectOutput) {
4256
});
4357
}
4458

59+
@Override
4560
protected TestInvocationBuilder checkContextSwitching() {
4661
return testDefinitionForService(
4762
"CheckContextSwitching",
@@ -65,6 +80,7 @@ protected TestInvocationBuilder checkContextSwitching() {
6580
});
6681
}
6782

83+
@Override
6884
protected TestInvocationBuilder sideEffectGuard() {
6985
return testDefinitionForService(
7086
"SideEffectGuard",
@@ -75,4 +91,20 @@ protected TestInvocationBuilder sideEffectGuard() {
7591
throw new IllegalStateException("This point should not be reached");
7692
});
7793
}
94+
95+
@Override
96+
protected TestInvocationBuilder failingSideEffect(String name, String reason) {
97+
return testDefinitionForService(
98+
"FailingSideEffect",
99+
CoreSerdes.VOID,
100+
CoreSerdes.JSON_STRING,
101+
(ctx, unused) -> {
102+
ctx.run(
103+
name,
104+
() -> {
105+
throw new IllegalStateException(reason);
106+
});
107+
return null;
108+
});
109+
}
78110
}

sdk-common/src/main/java/dev/restate/sdk/common/syscalls/Syscalls.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ void send(
6868
@Nullable Duration delay,
6969
SyscallCallback<Void> requestCallback);
7070

71-
void enterSideEffectBlock(EnterSideEffectSyscallCallback callback);
71+
void enterSideEffectBlock(@Nullable String name, EnterSideEffectSyscallCallback callback);
7272

7373
void exitSideEffectBlock(ByteString toWrite, ExitSideEffectSyscallCallback callback);
7474

sdk-core/src/main/java/dev/restate/sdk/core/Entries.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ final class Entries {
2626
private Entries() {}
2727

2828
abstract static class JournalEntry<E extends MessageLite> {
29+
abstract String getName(E expected);
30+
2931
void checkEntryHeader(E expected, MessageLite actual) throws ProtocolException {}
3032

3133
abstract void trace(E expected, Span span);
@@ -57,6 +59,11 @@ static final class OutputEntry extends JournalEntry<OutputEntryMessage> {
5759

5860
private OutputEntry() {}
5961

62+
@Override
63+
String getName(OutputEntryMessage expected) {
64+
return expected.getName();
65+
}
66+
6067
@Override
6168
public void trace(OutputEntryMessage expected, Span span) {
6269
span.addEvent("Output");
@@ -81,6 +88,11 @@ public boolean hasResult(GetStateEntryMessage actual) {
8188
return actual.getResultCase() != GetStateEntryMessage.ResultCase.RESULT_NOT_SET;
8289
}
8390

91+
@Override
92+
String getName(GetStateEntryMessage expected) {
93+
return expected.getName();
94+
}
95+
8496
@Override
8597
void checkEntryHeader(GetStateEntryMessage expected, MessageLite actual)
8698
throws ProtocolException {
@@ -163,6 +175,11 @@ public boolean hasResult(GetStateKeysEntryMessage actual) {
163175
return actual.getResultCase() != GetStateKeysEntryMessage.ResultCase.RESULT_NOT_SET;
164176
}
165177

178+
@Override
179+
String getName(GetStateKeysEntryMessage expected) {
180+
return expected.getName();
181+
}
182+
166183
@Override
167184
void checkEntryHeader(GetStateKeysEntryMessage expected, MessageLite actual)
168185
throws ProtocolException {
@@ -232,6 +249,11 @@ public void trace(ClearStateEntryMessage expected, Span span) {
232249
"ClearState", Attributes.of(Tracing.RESTATE_STATE_KEY, expected.getKey().toString()));
233250
}
234251

252+
@Override
253+
String getName(ClearStateEntryMessage expected) {
254+
return expected.getName();
255+
}
256+
235257
@Override
236258
void checkEntryHeader(ClearStateEntryMessage expected, MessageLite actual)
237259
throws ProtocolException {
@@ -256,6 +278,11 @@ public void trace(ClearAllStateEntryMessage expected, Span span) {
256278
span.addEvent("ClearAllState");
257279
}
258280

281+
@Override
282+
String getName(ClearAllStateEntryMessage expected) {
283+
return expected.getName();
284+
}
285+
259286
@Override
260287
void checkEntryHeader(ClearAllStateEntryMessage expected, MessageLite actual)
261288
throws ProtocolException {
@@ -281,6 +308,11 @@ public void trace(SetStateEntryMessage expected, Span span) {
281308
"SetState", Attributes.of(Tracing.RESTATE_STATE_KEY, expected.getKey().toString()));
282309
}
283310

311+
@Override
312+
String getName(SetStateEntryMessage expected) {
313+
return expected.getName();
314+
}
315+
284316
@Override
285317
void checkEntryHeader(SetStateEntryMessage expected, MessageLite actual)
286318
throws ProtocolException {
@@ -305,6 +337,11 @@ static final class SleepEntry extends CompletableJournalEntry<SleepEntryMessage,
305337

306338
private SleepEntry() {}
307339

340+
@Override
341+
String getName(SleepEntryMessage expected) {
342+
return expected.getName();
343+
}
344+
308345
@Override
309346
void trace(SleepEntryMessage expected, Span span) {
310347
span.addEvent(
@@ -362,6 +399,11 @@ public boolean hasResult(InvokeEntryMessage actual) {
362399
return actual.getResultCase() != Protocol.InvokeEntryMessage.ResultCase.RESULT_NOT_SET;
363400
}
364401

402+
@Override
403+
String getName(InvokeEntryMessage expected) {
404+
return expected.getName();
405+
}
406+
365407
@Override
366408
void checkEntryHeader(InvokeEntryMessage expected, MessageLite actual)
367409
throws ProtocolException {
@@ -414,6 +456,11 @@ public void trace(BackgroundInvokeEntryMessage expected, Span span) {
414456
expected.getMethodName()));
415457
}
416458

459+
@Override
460+
String getName(BackgroundInvokeEntryMessage expected) {
461+
return expected.getName();
462+
}
463+
417464
@Override
418465
void checkEntryHeader(BackgroundInvokeEntryMessage expected, MessageLite actual)
419466
throws ProtocolException {
@@ -427,6 +474,11 @@ static final class AwakeableEntry
427474

428475
private AwakeableEntry() {}
429476

477+
@Override
478+
String getName(AwakeableEntryMessage expected) {
479+
return expected.getName();
480+
}
481+
430482
@Override
431483
void trace(AwakeableEntryMessage expected, Span span) {
432484
span.addEvent("Awakeable");
@@ -468,6 +520,11 @@ public void trace(CompleteAwakeableEntryMessage expected, Span span) {
468520
span.addEvent("CompleteAwakeable");
469521
}
470522

523+
@Override
524+
String getName(CompleteAwakeableEntryMessage expected) {
525+
return expected.getName();
526+
}
527+
471528
@Override
472529
void checkEntryHeader(CompleteAwakeableEntryMessage expected, MessageLite actual)
473530
throws ProtocolException {

sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingSyscalls.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ public void send(
8888
}
8989

9090
@Override
91-
public void enterSideEffectBlock(EnterSideEffectSyscallCallback callback) {
92-
syscallsExecutor.execute(() -> syscalls.enterSideEffectBlock(callback));
91+
public void enterSideEffectBlock(String name, EnterSideEffectSyscallCallback callback) {
92+
syscallsExecutor.execute(() -> syscalls.enterSideEffectBlock(name, callback));
9393
}
9494

9595
@Override

0 commit comments

Comments
 (0)