Skip to content

Commit dd79547

Browse files
KeyedContext#clearAll (#217)
* Add `ctx.clearAll()` * Use `clearAll()` in WorkflowManager
1 parent 2635559 commit dd79547

File tree

16 files changed

+148
-6
lines changed

16 files changed

+148
-6
lines changed

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
6060
}
6161
}
6262

63+
override suspend fun clearAll() {
64+
return suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
65+
syscalls.clearAll(completingUnitContinuation(cont))
66+
}
67+
}
68+
6369
override suspend fun timer(duration: Duration): Awaitable<Unit> {
6470
val deferred: Deferred<Void> =
6571
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<Void>> ->

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ sealed interface KeyedContext : UnkeyedContext {
238238
*/
239239
suspend fun clear(key: StateKey<*>)
240240

241+
/** Clears all the state of this service instance key-value state storage */
242+
suspend fun clearAll()
243+
241244
companion object {
242245

243246
/**

sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/EagerStateTest.kt

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@ package dev.restate.sdk.kotlin
1111
import dev.restate.sdk.common.CoreSerdes
1212
import dev.restate.sdk.common.StateKey
1313
import dev.restate.sdk.core.EagerStateTestSuite
14-
import dev.restate.sdk.core.testservices.GreeterGrpcKt
15-
import dev.restate.sdk.core.testservices.GreetingRequest
16-
import dev.restate.sdk.core.testservices.GreetingResponse
17-
import dev.restate.sdk.core.testservices.greetingResponse
14+
import dev.restate.sdk.core.testservices.*
1815
import io.grpc.BindableService
1916
import kotlinx.coroutines.Dispatchers
2017
import org.assertj.core.api.AssertionsForClassTypes.assertThat
@@ -75,4 +72,22 @@ class EagerStateTest : EagerStateTestSuite() {
7572
override fun getClearAndGet(): BindableService {
7673
return GetClearAndGet()
7774
}
75+
76+
private class GetClearAllAndGet : GreeterRestateKt.GreeterRestateKtImplBase() {
77+
override suspend fun greet(context: KeyedContext, request: GreetingRequest): GreetingResponse {
78+
val ctx = KeyedContext.current()
79+
val oldState = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING))!!
80+
81+
ctx.clearAll()
82+
83+
assertThat(ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING))).isNull()
84+
assertThat(ctx.get(StateKey.of("ANOTHER_STATE", CoreSerdes.JSON_STRING))).isNull()
85+
86+
return greetingResponse { message = oldState }
87+
}
88+
}
89+
90+
override fun getClearAllAndGet(): BindableService {
91+
return GetClearAllAndGet()
92+
}
7893
}

sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public void clear(StateKey<?> key) {
4848
Util.<Void>blockOnSyscall(cb -> syscalls.clear(key.name(), cb));
4949
}
5050

51+
@Override
52+
public void clearAll() {
53+
Util.<Void>blockOnSyscall(syscalls::clearAll);
54+
}
55+
5156
@Override
5257
public <T> void set(StateKey<T> key, @Nonnull T value) {
5358
Util.<Void>blockOnSyscall(

sdk-api/src/main/java/dev/restate/sdk/KeyedContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public interface KeyedContext extends UnkeyedContext {
4141
*/
4242
void clear(StateKey<?> key);
4343

44+
/** Clears all the state of this service instance key-value state storage */
45+
void clearAll();
46+
4447
/**
4548
* Sets the given value under the given key, serializing the value using the {@link Serde} in the
4649
* {@link StateKey}.

sdk-api/src/test/java/dev/restate/sdk/EagerStateTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,26 @@ public void greet(GreetingRequest request, StreamObserver<GreetingResponse> resp
9797
protected BindableService getClearAndGet() {
9898
return new GetClearAndGet();
9999
}
100+
101+
private static class GetClearAllAndGet extends GreeterGrpc.GreeterImplBase
102+
implements RestateService {
103+
@Override
104+
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
105+
KeyedContext ctx = KeyedContext.current();
106+
107+
String oldState = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING)).get();
108+
109+
ctx.clearAll();
110+
assertThat(ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING))).isEmpty();
111+
assertThat(ctx.get(StateKey.of("ANOTHER_STATE", CoreSerdes.JSON_STRING))).isEmpty();
112+
113+
responseObserver.onNext(GreetingResponse.newBuilder().setMessage(oldState).build());
114+
responseObserver.onCompleted();
115+
}
116+
}
117+
118+
@Override
119+
protected BindableService getClearAllAndGet() {
120+
return new GetClearAllAndGet();
121+
}
100122
}

sdk-common/src/main/java/dev/restate/sdk/common/syscalls/Syscalls.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ static Syscalls current() {
6363

6464
void clear(String name, SyscallCallback<Void> callback);
6565

66+
void clearAll(SyscallCallback<Void> callback);
67+
6668
void set(String name, ByteString value, SyscallCallback<Void> callback);
6769

6870
// ----- Syscalls

sdk-core/src/main/java/dev/restate/sdk/core/Entries.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,30 @@ void updateUserStateStoreWithEntry(
208208
}
209209
}
210210

211+
static final class ClearAllStateEntry extends JournalEntry<ClearAllStateEntryMessage> {
212+
213+
static final ClearAllStateEntry INSTANCE = new ClearAllStateEntry();
214+
215+
private ClearAllStateEntry() {}
216+
217+
@Override
218+
public void trace(ClearAllStateEntryMessage expected, Span span) {
219+
span.addEvent("ClearAllState");
220+
}
221+
222+
@Override
223+
void checkEntryHeader(ClearAllStateEntryMessage expected, MessageLite actual)
224+
throws ProtocolException {
225+
Util.assertEntryEquals(expected, actual);
226+
}
227+
228+
@Override
229+
void updateUserStateStoreWithEntry(
230+
ClearAllStateEntryMessage expected, UserStateStore userStateStore) {
231+
userStateStore.clearAll();
232+
}
233+
}
234+
211235
static final class SetStateEntry extends JournalEntry<SetStateEntryMessage> {
212236

213237
static final SetStateEntry INSTANCE = new SetStateEntry();

sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingSyscalls.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ public void clear(String name, SyscallCallback<Void> callback) {
5555
syscallsExecutor.execute(() -> syscalls.clear(name, callback));
5656
}
5757

58+
@Override
59+
public void clearAll(SyscallCallback<Void> callback) {
60+
syscallsExecutor.execute(() -> syscalls.clearAll(callback));
61+
}
62+
5863
@Override
5964
public void set(String name, ByteString value, SyscallCallback<Void> callback) {
6065
syscallsExecutor.execute(() -> syscalls.set(name, value, callback));

sdk-core/src/main/java/dev/restate/sdk/core/MessageHeader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public static MessageHeader fromMessage(MessageLite msg) {
8585
return new MessageHeader(MessageType.SetStateEntryMessage, 0, msg.getSerializedSize());
8686
} else if (msg instanceof Protocol.ClearStateEntryMessage) {
8787
return new MessageHeader(MessageType.ClearStateEntryMessage, 0, msg.getSerializedSize());
88+
} else if (msg instanceof Protocol.ClearAllStateEntryMessage) {
89+
return new MessageHeader(MessageType.ClearAllStateEntryMessage, 0, msg.getSerializedSize());
8890
} else if (msg instanceof Protocol.SleepEntryMessage) {
8991
return new MessageHeader(
9092
MessageType.SleepEntryMessage,

0 commit comments

Comments
 (0)