Skip to content

Commit cae0d3b

Browse files
Add resolve/reject awakeables from ingress. Fix #246 (#254)
1 parent 4205690 commit cae0d3b

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.net.http.HttpResponse;
2323
import java.util.Map;
2424
import java.util.concurrent.CompletableFuture;
25+
import org.jspecify.annotations.NonNull;
2526

2627
public class DefaultIngressClient implements IngressClient {
2728

@@ -92,6 +93,75 @@ public <Req> CompletableFuture<String> sendAsync(
9293
});
9394
}
9495

96+
@Override
97+
public AwakeableHandle awakeableHandle(String id) {
98+
return new AwakeableHandle() {
99+
@Override
100+
public <T> CompletableFuture<Void> resolve(Serde<T> serde, @NonNull T payload) {
101+
// Prepare request
102+
var reqBuilder =
103+
HttpRequest.newBuilder().uri(URI.create("/restate/awakeables/" + id + "/resolve"));
104+
105+
// Add content-type
106+
if (serde.contentType() != null) {
107+
reqBuilder.header("content-type", serde.contentType());
108+
}
109+
110+
// Add headers
111+
headers.forEach(reqBuilder::header);
112+
113+
// Build and Send request
114+
HttpRequest request =
115+
reqBuilder
116+
.POST(HttpRequest.BodyPublishers.ofByteArray(serde.serialize(payload)))
117+
.build();
118+
return httpClient
119+
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
120+
.handle(
121+
(response, throwable) -> {
122+
if (throwable != null) {
123+
throw new IngressException("Error when executing the request", throwable);
124+
}
125+
126+
if (response.statusCode() >= 300) {
127+
handleNonSuccessResponse(response);
128+
}
129+
130+
return null;
131+
});
132+
}
133+
134+
@Override
135+
public CompletableFuture<Void> reject(String reason) {
136+
// Prepare request
137+
var reqBuilder =
138+
HttpRequest.newBuilder()
139+
.uri(URI.create("/restate/awakeables/" + id + "/reject"))
140+
.header("content-type", "text-plain");
141+
142+
// Add headers
143+
headers.forEach(reqBuilder::header);
144+
145+
// Build and Send request
146+
HttpRequest request = reqBuilder.POST(HttpRequest.BodyPublishers.ofString(reason)).build();
147+
return httpClient
148+
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
149+
.handle(
150+
(response, throwable) -> {
151+
if (throwable != null) {
152+
throw new IngressException("Error when executing the request", throwable);
153+
}
154+
155+
if (response.statusCode() >= 300) {
156+
handleNonSuccessResponse(response);
157+
}
158+
159+
return null;
160+
});
161+
}
162+
};
163+
}
164+
95165
private URI toRequestURI(Target target, boolean isSend) {
96166
StringBuilder builder = new StringBuilder();
97167
builder.append("/").append(target.getComponent());

sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.Map;
1616
import java.util.concurrent.CompletableFuture;
1717
import java.util.concurrent.CompletionException;
18+
import org.jspecify.annotations.NonNull;
1819

1920
public interface IngressClient {
2021

@@ -67,6 +68,34 @@ default <Req> String send(Target target, Serde<Req> reqSerde, Req req) throws In
6768
return send(target, reqSerde, req, RequestOptions.DEFAULT);
6869
}
6970

71+
/**
72+
* Create a new {@link AwakeableHandle} for the provided identifier. You can use it to {@link
73+
* AwakeableHandle#resolve(Serde, Object)} or {@link AwakeableHandle#reject(String)} an Awakeable
74+
* from the ingress.
75+
*/
76+
AwakeableHandle awakeableHandle(String id);
77+
78+
/**
79+
* This class represents a handle to an Awakeable. It can be used to complete awakeables from the
80+
* ingress
81+
*/
82+
interface AwakeableHandle {
83+
/**
84+
* Complete with success the Awakeable.
85+
*
86+
* @param serde used to serialize the Awakeable result payload.
87+
* @param payload the result payload. MUST NOT be null.
88+
*/
89+
<T> CompletableFuture<Void> resolve(Serde<T> serde, @NonNull T payload);
90+
91+
/**
92+
* Complete with failure the Awakeable.
93+
*
94+
* @param reason the rejection reason. MUST NOT be null.
95+
*/
96+
CompletableFuture<Void> reject(String reason);
97+
}
98+
7099
static IngressClient defaultClient(String baseUri) {
71100
return defaultClient(baseUri, Collections.emptyMap());
72101
}

0 commit comments

Comments
 (0)