Skip to content

Commit cca0f96

Browse files
More implicitness in kotlin (#277)
We had to rename ctx.run to ctx.runBlock in kotlin to avoid clashing with stdlib run extension method.
1 parent f8244e2 commit cca0f96

File tree

11 files changed

+108
-40
lines changed

11 files changed

+108
-40
lines changed

examples/src/main/kotlin/my/restate/sdk/examples/CounterKt.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@ package my.restate.sdk.examples
1010

1111
import dev.restate.sdk.annotation.Handler
1212
import dev.restate.sdk.annotation.VirtualObject
13-
import dev.restate.sdk.common.StateKey
1413
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder
15-
import dev.restate.sdk.kotlin.KtSerdes
14+
import dev.restate.sdk.kotlin.KtStateKey
1615
import dev.restate.sdk.kotlin.ObjectContext
1716
import kotlinx.serialization.Serializable
1817
import org.apache.logging.log4j.LogManager
@@ -24,7 +23,7 @@ import org.apache.logging.log4j.Logger
2423
class CounterKt {
2524

2625
companion object {
27-
private val TOTAL = StateKey.of<Long>("total", KtSerdes.json())
26+
private val TOTAL = KtStateKey.json<Long>("total")
2827
private val LOG: Logger = LogManager.getLogger(CounterKt::class.java)
2928
}
3029

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
130130
}
131131
}
132132

133-
override suspend fun <T : Any?> run(serde: Serde<T>, sideEffectAction: suspend () -> T): T {
133+
override suspend fun <T : Any?> runBlock(serde: Serde<T>, block: suspend () -> T): T {
134134
val exitResult =
135135
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteString>>
136136
->
@@ -165,7 +165,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
165165
var actionReturnValue: T? = null
166166
var actionFailure: TerminalException? = null
167167
try {
168-
actionReturnValue = sideEffectAction()
168+
actionReturnValue = block()
169169
} catch (e: TerminalException) {
170170
actionFailure = e
171171
} catch (e: Error) {

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/KtSerdes.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,21 @@ package dev.restate.sdk.kotlin
1010

1111
import com.google.protobuf.ByteString
1212
import dev.restate.sdk.common.Serde
13+
import dev.restate.sdk.common.StateKey
1314
import java.nio.charset.StandardCharsets
1415
import kotlin.reflect.typeOf
1516
import kotlinx.serialization.KSerializer
1617
import kotlinx.serialization.json.Json
1718
import kotlinx.serialization.serializer
1819

20+
object KtStateKey {
21+
22+
/** Creates a json [StateKey]. */
23+
inline fun <reified T> json(name: String): StateKey<T> {
24+
return StateKey.of(name, KtSerdes.json())
25+
}
26+
}
27+
1928
object KtSerdes {
2029

2130
/** Creates a [Serde] implementation using the `kotlinx.serialization` json module. */

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,9 @@ sealed interface Context {
114114
* Errors occurring within this closure won't be propagated to the caller, unless they are
115115
* [TerminalException]. Consider the following code:
116116
* ```
117-
* // Bad usage of try-catch outside the run
117+
* // Bad usage of try-catch outside the runBlock
118118
* try {
119-
* ctx.run {
119+
* ctx.runBlock {
120120
* throw IllegalStateException();
121121
* };
122122
* } catch (e: IllegalStateException) {
@@ -125,9 +125,9 @@ sealed interface Context {
125125
* // following the invocation retry policy.
126126
* }
127127
*
128-
* // Good usage of try-catch outside the run
128+
* // Good usage of try-catch outside the runBlock
129129
* try {
130-
* ctx.run {
130+
* ctx.runBlock {
131131
* throw TerminalException("my error");
132132
* };
133133
* } catch (e: TerminalException) {
@@ -138,16 +138,11 @@ sealed interface Context {
138138
* To propagate failures to the run call-site, make sure to wrap them in [TerminalException].
139139
*
140140
* @param serde the type tag of the return value, used to serialize/deserialize it.
141-
* @param action closure to execute.
141+
* @param block closure to execute.
142142
* @param T type of the return value.
143-
* @return value of the run operation.
143+
* @return value of the runBlock operation.
144144
*/
145-
suspend fun <T : Any?> run(serde: Serde<T>, sideEffectAction: suspend () -> T): T
146-
147-
/** Like [run] without a return value. */
148-
suspend fun run(sideEffectAction: suspend () -> Unit) {
149-
run(KtSerdes.UNIT, sideEffectAction)
150-
}
145+
suspend fun <T : Any?> runBlock(serde: Serde<T>, block: suspend () -> T): T
151146

152147
/**
153148
* Create an [Awakeable], addressable through [Awakeable.id].
@@ -176,15 +171,74 @@ sealed interface Context {
176171
*
177172
* This instance is useful to generate identifiers, idempotency keys, and for uniform sampling
178173
* from a set of options. If a cryptographically secure value is needed, please generate that
179-
* externally using [run].
174+
* externally using [runBlock].
180175
*
181-
* You MUST NOT use this [Random] instance inside a [run].
176+
* You MUST NOT use this [Random] instance inside a [runBlock].
182177
*
183178
* @return the [Random] instance.
184179
*/
185180
fun random(): RestateRandom
186181
}
187182

183+
/**
184+
* Execute a non-deterministic closure, recording the result value in the journal using
185+
* [KtSerdes.json]. The result value will be re-played in case of re-invocation (e.g. because of
186+
* failure recovery or suspension point) without re-executing the closure. Use this feature if you
187+
* want to perform <b>non-deterministic operations</b>.
188+
*
189+
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
190+
* times until it records a result.
191+
*
192+
* <h2>Error handling</h2>
193+
*
194+
* Errors occurring within this closure won't be propagated to the caller, unless they are
195+
* [TerminalException]. Consider the following code:
196+
* ```
197+
* // Bad usage of try-catch outside the runBlock
198+
* try {
199+
* ctx.runBlock {
200+
* throw IllegalStateException();
201+
* };
202+
* } catch (e: IllegalStateException) {
203+
* // This will never be executed,
204+
* // but the error will be retried by Restate,
205+
* // following the invocation retry policy.
206+
* }
207+
*
208+
* // Good usage of try-catch outside the runBlock
209+
* try {
210+
* ctx.runBlock {
211+
* throw TerminalException("my error");
212+
* };
213+
* } catch (e: TerminalException) {
214+
* // This is invoked
215+
* }
216+
* ```
217+
*
218+
* To propagate failures to the run call-site, make sure to wrap them in [TerminalException].
219+
*
220+
* @param block closure to execute.
221+
* @param T type of the return value.
222+
* @return value of the runBlock operation.
223+
*/
224+
suspend inline fun <reified T : Any> Context.runBlock(noinline block: suspend () -> T): T {
225+
return this.runBlock(KtSerdes.json(), block)
226+
}
227+
228+
/**
229+
* Create an [Awakeable] using [KtSerdes.json] deserializer, addressable through [Awakeable.id].
230+
*
231+
* You can use this feature to implement external asynchronous systems interactions, for example you
232+
* can send a Kafka record including the [Awakeable.id], and then let another service consume from
233+
* Kafka the responses of given external system interaction by using [awakeableHandle].
234+
*
235+
* @return the [Awakeable] to await on.
236+
* @see Awakeable
237+
*/
238+
suspend inline fun <reified T : Any> Context.awakeable(): Awakeable<T> {
239+
return this.awakeable(KtSerdes.json())
240+
}
241+
188242
/**
189243
* This interface extends [Context] adding access to the virtual object instance key-value state
190244
* storage.
@@ -233,7 +287,7 @@ class RestateRandom(seed: Long, private val syscalls: Syscalls) : Random() {
233287
private val r = Random(seed)
234288

235289
override fun nextBits(bitCount: Int): Int {
236-
check(!syscalls.isInsideSideEffect) { "You can't use RestateRandom inside ctx.run!" }
290+
check(!syscalls.isInsideSideEffect) { "You can't use RestateRandom inside ctx.runBlock!" }
237291
return r.nextBits(bitCount)
238292
}
239293

@@ -369,3 +423,13 @@ sealed interface AwakeableHandle {
369423
*/
370424
suspend fun reject(reason: String)
371425
}
426+
427+
/**
428+
* Complete with success the [Awakeable] using [KtSerdes.json] serializer.
429+
*
430+
* @param payload the result payload.
431+
* @see Awakeable
432+
*/
433+
suspend inline fun <reified T : Any> AwakeableHandle.resolve(payload: T) {
434+
return this.resolve(KtSerdes.json(), payload)
435+
}

sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/AwakeableIdTest.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
99
package dev.restate.sdk.kotlin
1010

11-
import dev.restate.sdk.common.CoreSerdes
1211
import dev.restate.sdk.core.AwakeableIdTestSuite
1312
import dev.restate.sdk.core.TestDefinitions
1413
import dev.restate.sdk.kotlin.KotlinCoroutinesTests.Companion.testDefinitionForService
@@ -17,7 +16,7 @@ class AwakeableIdTest : AwakeableIdTestSuite() {
1716

1817
override fun returnAwakeableId(): TestDefinitions.TestInvocationBuilder =
1918
testDefinitionForService("ReturnAwakeableId") { ctx, _: Unit ->
20-
val awakeable = ctx.awakeable(CoreSerdes.JSON_STRING)
19+
val awakeable: Awakeable<String> = ctx.awakeable()
2120
awakeable.id
2221
}
2322
}

sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/RandomTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class RandomTest : RandomTestSuite() {
2121

2222
override fun randomInsideSideEffect(): TestInvocationBuilder =
2323
testDefinitionForService<Unit, Int>("RandomInsideSideEffect") { ctx, _: Unit ->
24-
ctx.run { ctx.random().nextInt() }
24+
ctx.runBlock { ctx.random().nextInt() }
2525
throw IllegalStateException("This should not unreachable")
2626
}
2727

sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
99
package dev.restate.sdk.kotlin
1010

11-
import dev.restate.sdk.common.CoreSerdes
1211
import dev.restate.sdk.core.ProtoUtils.GREETER_SERVICE_TARGET
1312
import dev.restate.sdk.core.SideEffectTestSuite
1413
import dev.restate.sdk.core.TestDefinitions
@@ -23,15 +22,14 @@ class SideEffectTest : SideEffectTestSuite() {
2322

2423
override fun sideEffect(sideEffectOutput: String): TestInvocationBuilder =
2524
testDefinitionForService("SideEffect") { ctx, _: Unit ->
26-
val result = ctx.run(CoreSerdes.JSON_STRING) { sideEffectOutput }
25+
val result = ctx.runBlock { sideEffectOutput }
2726
"Hello $result"
2827
}
2928

3029
override fun consecutiveSideEffect(sideEffectOutput: String): TestInvocationBuilder =
3130
testDefinitionForService("ConsecutiveSideEffect") { ctx, _: Unit ->
32-
val firstResult = ctx.run(CoreSerdes.JSON_STRING) { sideEffectOutput }
33-
val secondResult =
34-
ctx.run(CoreSerdes.JSON_STRING) { firstResult.uppercase(Locale.getDefault()) }
31+
val firstResult = ctx.runBlock { sideEffectOutput }
32+
val secondResult = ctx.runBlock { firstResult.uppercase(Locale.getDefault()) }
3533
"Hello $secondResult"
3634
}
3735

@@ -42,8 +40,7 @@ class SideEffectTest : SideEffectTestSuite() {
4240
Component.Options(
4341
Dispatchers.Unconfined + CoroutineName("CheckContextSwitchingTestCoroutine"))) {
4442
handler("run") { ctx, _: Unit ->
45-
val sideEffectCoroutine =
46-
ctx.run(CoreSerdes.JSON_STRING) { coroutineContext[CoroutineName]!!.name }
43+
val sideEffectCoroutine = ctx.runBlock { coroutineContext[CoroutineName]!!.name }
4744
check(sideEffectCoroutine == "CheckContextSwitchingTestCoroutine") {
4845
"Side effect thread is not running within the same coroutine context of the handler method: $sideEffectCoroutine"
4946
}
@@ -54,7 +51,7 @@ class SideEffectTest : SideEffectTestSuite() {
5451

5552
override fun sideEffectGuard(): TestInvocationBuilder =
5653
testDefinitionForService<Unit, String>("SideEffectGuard") { ctx, _: Unit ->
57-
ctx.run { ctx.send(GREETER_SERVICE_TARGET, KtSerdes.json(), "something") }
54+
ctx.runBlock { ctx.send(GREETER_SERVICE_TARGET, KtSerdes.json(), "something") }
5855
throw IllegalStateException("This point should not be reached")
5956
}
6057
}

sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/StateMachineFailuresTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class StateMachineFailuresTest : StateMachineFailuresTestSuite() {
4747

4848
override fun sideEffectFailure(serde: Serde<Int>): TestInvocationBuilder =
4949
testDefinitionForService("SideEffectFailure") { ctx, _: Unit ->
50-
ctx.run(serde) { 0 }
50+
ctx.runBlock(serde) { 0 }
5151
"Francesco"
5252
}
5353
}

sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/UserFailuresTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class UserFailuresTest : UserFailuresTestSuite() {
2626
): TestInvocationBuilder =
2727
testDefinitionForService<Unit, Unit>("SideEffectThrowIllegalStateException") { ctx, _: Unit ->
2828
try {
29-
ctx.run { throw IllegalStateException("Whatever") }
29+
ctx.runBlock { throw IllegalStateException("Whatever") }
3030
} catch (e: Throwable) {
3131
if (e !is CancellationException && e !is TerminalException) {
3232
nonTerminalExceptionsSeen.addAndGet(1)
@@ -44,7 +44,7 @@ class UserFailuresTest : UserFailuresTestSuite() {
4444

4545
override fun sideEffectThrowTerminalException(code: Int, message: String): TestInvocationBuilder =
4646
testDefinitionForService<Unit, Unit>("SideEffectThrowTerminalException") { ctx, _: Unit ->
47-
ctx.run { throw TerminalException(code, message) }
47+
ctx.runBlock<Unit> { throw TerminalException(code, message) }
4848
throw IllegalStateException("Not expected to reach this point")
4949
}
5050
}

sdk-core/src/test/java/dev/restate/sdk/core/RandomTestSuite.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
99
package dev.restate.sdk.core;
1010

11-
import static dev.restate.sdk.core.AssertUtils.containsOnlyExactErrorMessage;
11+
import static dev.restate.sdk.core.AssertUtils.*;
1212
import static dev.restate.sdk.core.ProtoUtils.*;
1313

1414
import dev.restate.sdk.core.TestDefinitions.TestDefinition;
@@ -37,7 +37,7 @@ public Stream<TestDefinition> definitions() {
3737
this.randomInsideSideEffect()
3838
.withInput(startMessage(1).setDebugId(debugId), ProtoUtils.inputMessage())
3939
.assertingOutput(
40-
containsOnlyExactErrorMessage(
41-
new IllegalStateException("You can't use RestateRandom inside ctx.run!"))));
40+
containsOnly(
41+
errorMessageStartingWith(IllegalStateException.class.getCanonicalName()))));
4242
}
4343
}

0 commit comments

Comments
 (0)