Skip to content

Commit e99f4f3

Browse files
Use the new side effect entry shape
1 parent eaf2eb9 commit e99f4f3

File tree

10 files changed

+32
-46
lines changed

10 files changed

+32
-46
lines changed

sdk-core/src/main/java/dev/restate/sdk/core/InvocationStateMachine.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -412,10 +412,10 @@ void enterSideEffectBlock(String name, EnterSideEffectSyscallCallback callback)
412412
// Retrieve the entry
413413
this.readEntry(
414414
msg -> {
415-
Util.assertEntryClass(Java.SideEffectEntryMessage.class, msg);
415+
Util.assertEntryClass(Protocol.SideEffectEntryMessage.class, msg);
416416

417417
// We have a result already, complete the callback
418-
completeSideEffectCallbackWithEntry((Java.SideEffectEntryMessage) msg, callback);
418+
completeSideEffectCallbackWithEntry((Protocol.SideEffectEntryMessage) msg, callback);
419419
},
420420
callback::onCancel);
421421
} else if (this.invocationState == InvocationState.PROCESSING) {
@@ -431,7 +431,7 @@ void enterSideEffectBlock(String name, EnterSideEffectSyscallCallback callback)
431431
}
432432

433433
void exitSideEffectBlock(
434-
Java.SideEffectEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) {
434+
Protocol.SideEffectEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) {
435435
this.insideSideEffect = false;
436436
if (this.invocationState == InvocationState.CLOSED) {
437437
callback.onCancel(AbortedExecutionException.INSTANCE);
@@ -454,7 +454,7 @@ void exitSideEffectBlock(
454454
this.writeEntry(sideEffectEntry);
455455

456456
// Wait for entry to be acked
457-
Java.SideEffectEntryMessage finalSideEffectEntry = sideEffectEntry;
457+
Protocol.SideEffectEntryMessage finalSideEffectEntry = sideEffectEntry;
458458
this.sideEffectAckStateMachine.waitLastSideEffectAck(
459459
new SideEffectAckStateMachine.SideEffectAckCallback() {
460460
@Override
@@ -480,7 +480,7 @@ public void onError(Throwable e) {
480480
}
481481

482482
void completeSideEffectCallbackWithEntry(
483-
Java.SideEffectEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) {
483+
Protocol.SideEffectEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) {
484484
if (sideEffectEntry.hasFailure()) {
485485
callback.onFailure(Util.toRestateException(sideEffectEntry.getFailure()));
486486
} else {

sdk-core/src/main/java/dev/restate/sdk/core/MessageHeader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package dev.restate.sdk.core;
1010

1111
import com.google.protobuf.MessageLite;
12-
import dev.restate.generated.sdk.java.Java;
1312
import dev.restate.generated.service.protocol.Protocol;
1413

1514
public class MessageHeader {
@@ -95,7 +94,7 @@ public static MessageHeader fromMessage(MessageLite msg) {
9594
? DONE_FLAG
9695
: 0,
9796
msg.getSerializedSize());
98-
} else if (msg instanceof Java.SideEffectEntryMessage) {
97+
} else if (msg instanceof Protocol.SideEffectEntryMessage) {
9998
return new MessageHeader(
10099
MessageType.SideEffectEntryMessage, REQUIRES_ACK_FLAG, msg.getSerializedSize());
101100
}

sdk-core/src/main/java/dev/restate/sdk/core/MessageType.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public enum MessageType {
6262
public static final short AWAKEABLE_ENTRY_MESSAGE_TYPE = 0x0C03;
6363
public static final short COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE = 0x0C04;
6464
public static final short COMBINATOR_AWAITABLE_ENTRY_MESSAGE_TYPE = (short) 0xFC00;
65-
public static final short SIDE_EFFECT_ENTRY_MESSAGE_TYPE = (short) 0xFC01;
65+
public static final short SIDE_EFFECT_ENTRY_MESSAGE_TYPE = (short) 0x0C05;
6666

6767
public Parser<? extends MessageLite> messageParser() {
6868
switch (this) {
@@ -105,7 +105,7 @@ public Parser<? extends MessageLite> messageParser() {
105105
case CombinatorAwaitableEntryMessage:
106106
return Java.CombinatorAwaitableEntryMessage.parser();
107107
case SideEffectEntryMessage:
108-
return Java.SideEffectEntryMessage.parser();
108+
return Protocol.SideEffectEntryMessage.parser();
109109
}
110110
throw new IllegalStateException();
111111
}
@@ -237,7 +237,7 @@ public static MessageType fromMessage(MessageLite msg) {
237237
return MessageType.CompleteAwakeableEntryMessage;
238238
} else if (msg instanceof Java.CombinatorAwaitableEntryMessage) {
239239
return MessageType.CombinatorAwaitableEntryMessage;
240-
} else if (msg instanceof Java.SideEffectEntryMessage) {
240+
} else if (msg instanceof Protocol.SideEffectEntryMessage) {
241241
return MessageType.SideEffectEntryMessage;
242242
} else if (msg instanceof Protocol.CompletionMessage) {
243243
throw new IllegalArgumentException("SDK should never send a CompletionMessage");

sdk-core/src/main/java/dev/restate/sdk/core/SyscallsImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package dev.restate.sdk.core;
1010

1111
import com.google.protobuf.ByteString;
12-
import dev.restate.generated.sdk.java.Java;
1312
import dev.restate.generated.service.protocol.Protocol;
1413
import dev.restate.sdk.common.Request;
1514
import dev.restate.sdk.common.Target;
@@ -234,7 +233,7 @@ public void exitSideEffectBlock(ByteString toWrite, ExitSideEffectSyscallCallbac
234233
() -> {
235234
LOG.trace("exitSideEffectBlock with success");
236235
this.stateMachine.exitSideEffectBlock(
237-
Java.SideEffectEntryMessage.newBuilder().setValue(toWrite).build(), callback);
236+
Protocol.SideEffectEntryMessage.newBuilder().setValue(toWrite).build(), callback);
238237
},
239238
callback);
240239
}
@@ -246,7 +245,7 @@ public void exitSideEffectBlockWithTerminalException(
246245
() -> {
247246
LOG.trace("exitSideEffectBlock with failure");
248247
this.stateMachine.exitSideEffectBlock(
249-
Java.SideEffectEntryMessage.newBuilder()
248+
Protocol.SideEffectEntryMessage.newBuilder()
250249
.setFailure(Util.toProtocolFailure(toWrite))
251250
.build(),
252251
callback);

sdk-core/src/main/java/dev/restate/sdk/core/Util.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,6 @@ static boolean isEntry(MessageLite msg) {
154154
|| msg instanceof Protocol.AwakeableEntryMessage
155155
|| msg instanceof Protocol.CompleteAwakeableEntryMessage
156156
|| msg instanceof Java.CombinatorAwaitableEntryMessage
157-
|| msg instanceof Java.SideEffectEntryMessage;
157+
|| msg instanceof Protocol.SideEffectEntryMessage;
158158
}
159159
}

sdk-core/src/main/sdk-proto/dev/restate/sdk/java.proto

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,3 @@ message CombinatorAwaitableEntryMessage {
2222
// Entry name
2323
string name = 12;
2424
}
25-
26-
// Type: 0xFC00 + 1
27-
// Flag: RequiresRuntimeAck
28-
message SideEffectEntryMessage {
29-
oneof result {
30-
bytes value = 14;
31-
dev.restate.service.protocol.Failure failure = 15;
32-
};
33-
34-
// Entry name
35-
string name = 12;
36-
}

sdk-core/src/test/java/dev/restate/sdk/core/SideEffectTestSuite.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import static org.assertj.core.api.InstanceOfAssertFactories.STRING;
1616
import static org.assertj.core.api.InstanceOfAssertFactories.type;
1717

18-
import dev.restate.generated.sdk.java.Java;
1918
import dev.restate.generated.service.protocol.Protocol;
2019
import dev.restate.sdk.common.CoreSerdes;
2120
import dev.restate.sdk.common.TerminalException;
@@ -41,49 +40,49 @@ public Stream<TestDefinitions.TestDefinition> definitions() {
4140
this.sideEffect("Francesco")
4241
.withInput(startMessage(1), inputMessage("Till"))
4342
.expectingOutput(
44-
Java.SideEffectEntryMessage.newBuilder()
43+
Protocol.SideEffectEntryMessage.newBuilder()
4544
.setValue(CoreSerdes.JSON_STRING.serializeToByteString("Francesco")),
4645
suspensionMessage(1))
4746
.named("Without optimization suspends"),
4847
this.sideEffect("Francesco")
4948
.withInput(startMessage(1), inputMessage("Till"), ackMessage(1))
5049
.expectingOutput(
51-
Java.SideEffectEntryMessage.newBuilder()
50+
Protocol.SideEffectEntryMessage.newBuilder()
5251
.setValue(CoreSerdes.JSON_STRING.serializeToByteString("Francesco")),
5352
outputMessage("Hello Francesco"),
5453
END_MESSAGE)
5554
.named("Without optimization and with acks returns"),
5655
this.namedSideEffect("get-my-name", "Francesco")
5756
.withInput(startMessage(1), inputMessage("Till"))
5857
.expectingOutput(
59-
Java.SideEffectEntryMessage.newBuilder()
58+
Protocol.SideEffectEntryMessage.newBuilder()
6059
.setName("get-my-name")
6160
.setValue(CoreSerdes.JSON_STRING.serializeToByteString("Francesco")),
6261
suspensionMessage(1)),
6362
this.consecutiveSideEffect("Francesco")
6463
.withInput(startMessage(1), inputMessage("Till"))
6564
.expectingOutput(
66-
Java.SideEffectEntryMessage.newBuilder()
65+
Protocol.SideEffectEntryMessage.newBuilder()
6766
.setValue(CoreSerdes.JSON_STRING.serializeToByteString("Francesco")),
6867
suspensionMessage(1))
6968
.named("With optimization and without ack on first side effect will suspend"),
7069
this.consecutiveSideEffect("Francesco")
7170
.withInput(startMessage(1), inputMessage("Till"), ackMessage(1))
7271
.onlyUnbuffered()
7372
.expectingOutput(
74-
Java.SideEffectEntryMessage.newBuilder()
73+
Protocol.SideEffectEntryMessage.newBuilder()
7574
.setValue(CoreSerdes.JSON_STRING.serializeToByteString("Francesco")),
76-
Java.SideEffectEntryMessage.newBuilder()
75+
Protocol.SideEffectEntryMessage.newBuilder()
7776
.setValue(CoreSerdes.JSON_STRING.serializeToByteString("FRANCESCO")),
7877
suspensionMessage(2))
7978
.named("With optimization and ack on first side effect will suspend"),
8079
this.consecutiveSideEffect("Francesco")
8180
.withInput(startMessage(1), inputMessage("Till"), ackMessage(1), ackMessage(2))
8281
.onlyUnbuffered()
8382
.expectingOutput(
84-
Java.SideEffectEntryMessage.newBuilder()
83+
Protocol.SideEffectEntryMessage.newBuilder()
8584
.setValue(CoreSerdes.JSON_STRING.serializeToByteString("Francesco")),
86-
Java.SideEffectEntryMessage.newBuilder()
85+
Protocol.SideEffectEntryMessage.newBuilder()
8786
.setValue(CoreSerdes.JSON_STRING.serializeToByteString("FRANCESCO")),
8887
outputMessage("Hello FRANCESCO"),
8988
END_MESSAGE)
@@ -117,8 +116,8 @@ public Stream<TestDefinitions.TestDefinition> definitions() {
117116
assertThat(actualOutputMessages).hasSize(3);
118117
assertThat(actualOutputMessages)
119118
.element(0)
120-
.asInstanceOf(type(Java.SideEffectEntryMessage.class))
121-
.returns(true, Java.SideEffectEntryMessage::hasValue);
119+
.asInstanceOf(type(Protocol.SideEffectEntryMessage.class))
120+
.returns(true, Protocol.SideEffectEntryMessage::hasValue);
122121
assertThat(actualOutputMessages).element(1).isEqualTo(outputMessage("Hello"));
123122
assertThat(actualOutputMessages).element(2).isEqualTo(END_MESSAGE);
124123
}),

sdk-core/src/test/java/dev/restate/sdk/core/StateMachineFailuresTestSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import static dev.restate.sdk.core.TestDefinitions.TestInvocationBuilder;
1515
import static org.assertj.core.api.Assertions.assertThat;
1616

17-
import dev.restate.generated.sdk.java.Java;
17+
import dev.restate.generated.service.protocol.Protocol;
1818
import dev.restate.sdk.common.Serde;
1919
import java.nio.charset.StandardCharsets;
2020
import java.util.concurrent.atomic.AtomicInteger;
@@ -78,7 +78,7 @@ public Stream<TestDefinitions.TestDefinition> definitions() {
7878
.named("Serde serialization error"),
7979
this.sideEffectFailure(FAILING_DESERIALIZATION_INTEGER_TYPE_TAG)
8080
.withInput(
81-
startMessage(2), inputMessage("Till"), Java.SideEffectEntryMessage.newBuilder())
81+
startMessage(2), inputMessage("Till"), Protocol.SideEffectEntryMessage.newBuilder())
8282
.assertingOutput(
8383
AssertUtils.containsOnly(
8484
errorMessageStartingWith(IllegalStateException.class.getCanonicalName())))

sdk-core/src/test/java/dev/restate/sdk/core/UserFailuresTestSuite.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import static dev.restate.sdk.core.TestDefinitions.*;
1515
import static org.assertj.core.api.Assertions.assertThat;
1616

17-
import dev.restate.generated.sdk.java.Java;
17+
import dev.restate.generated.service.protocol.Protocol;
1818
import dev.restate.sdk.common.TerminalException;
1919
import java.util.concurrent.atomic.AtomicInteger;
2020
import java.util.stream.Stream;
@@ -69,7 +69,7 @@ public Stream<TestDefinition> definitions() {
6969
TerminalException.INTERNAL_SERVER_ERROR_CODE, MY_ERROR)
7070
.withInput(startMessage(1), inputMessage(), ackMessage(1))
7171
.expectingOutput(
72-
Java.SideEffectEntryMessage.newBuilder()
72+
Protocol.SideEffectEntryMessage.newBuilder()
7373
.setFailure(
7474
Util.toProtocolFailure(
7575
TerminalException.INTERNAL_SERVER_ERROR_CODE, MY_ERROR)),
@@ -79,7 +79,7 @@ public Stream<TestDefinition> definitions() {
7979
this.sideEffectThrowTerminalException(501, WHATEVER)
8080
.withInput(startMessage(1), inputMessage(), ackMessage(1))
8181
.expectingOutput(
82-
Java.SideEffectEntryMessage.newBuilder()
82+
Protocol.SideEffectEntryMessage.newBuilder()
8383
.setFailure(Util.toProtocolFailure(501, WHATEVER)),
8484
outputMessage(501, WHATEVER),
8585
END_MESSAGE)

sdk-http-vertx/src/test/kotlin/dev/restate/sdk/http/vertx/VertxExecutorsTest.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@
99
package dev.restate.sdk.http.vertx
1010

1111
import com.google.protobuf.ByteString
12-
import dev.restate.generated.sdk.java.Java
12+
import dev.restate.generated.service.protocol.Protocol
1313
import dev.restate.sdk.common.CoreSerdes
1414
import dev.restate.sdk.core.ProtoUtils.*
1515
import dev.restate.sdk.core.TestDefinitions
1616
import dev.restate.sdk.core.TestDefinitions.testInvocation
17-
import dev.restate.sdk.kotlin.*
17+
import dev.restate.sdk.kotlin.Component
18+
import dev.restate.sdk.kotlin.runBlock
1819
import io.vertx.core.Vertx
1920
import java.util.stream.Stream
2021
import kotlin.coroutines.coroutineContext
@@ -75,7 +76,7 @@ class VertxExecutorsTest : TestDefinitions.TestSuite {
7576
.withInput(startMessage(1), inputMessage(), ackMessage(1))
7677
.onlyUnbuffered()
7778
.expectingOutput(
78-
Java.SideEffectEntryMessage.newBuilder().setValue(ByteString.EMPTY),
79+
Protocol.SideEffectEntryMessage.newBuilder().setValue(ByteString.EMPTY),
7980
outputMessage(),
8081
END_MESSAGE),
8182
testInvocation(
@@ -89,7 +90,7 @@ class VertxExecutorsTest : TestDefinitions.TestSuite {
8990
.withInput(startMessage(1), inputMessage(), ackMessage(1))
9091
.onlyUnbuffered()
9192
.expectingOutput(
92-
Java.SideEffectEntryMessage.newBuilder().setValue(ByteString.EMPTY),
93+
Protocol.SideEffectEntryMessage.newBuilder().setValue(ByteString.EMPTY),
9394
outputMessage(),
9495
END_MESSAGE))
9596
}

0 commit comments

Comments
 (0)