Skip to content

Commit 95bbd2e

Browse files
Fix logging context propagation (#185)
* Improve the Context propagation logic: * This is now more obvious, in particular It makes more clear what's the boundary between state machine and user code and how the Context is propagated * Renamed the methods of RestateServerCallListener to make more clear their goal * Propagate fully qualified method name and invocation state in SyscallsInternal * Reintroduce the GrpcContextDataProvider * Fix the LoggingContextSetter.THREAD_LOCAL_INSTANCE * Bump log4j2
1 parent f0981d2 commit 95bbd2e

16 files changed

+233
-106
lines changed

sdk-core/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ dependencies {
1717
implementation(coreLibs.grpc.protobuf)
1818
implementation(coreLibs.log4j.api)
1919

20+
// We don't want a hard-dependency on it
21+
compileOnly(coreLibs.log4j.core)
22+
2023
implementation(platform(coreLibs.opentelemetry.bom))
2124
implementation(coreLibs.opentelemetry.api)
2225
implementation(coreLibs.opentelemetry.semconv)

sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingWrappers.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import com.google.protobuf.ByteString;
1212
import com.google.protobuf.MessageLite;
1313
import dev.restate.sdk.common.TerminalException;
14-
import dev.restate.sdk.common.syscalls.*;
1514
import dev.restate.sdk.common.syscalls.DeferredResult;
1615
import dev.restate.sdk.common.syscalls.EnterSideEffectSyscallCallback;
1716
import dev.restate.sdk.common.syscalls.ExitSideEffectSyscallCallback;
@@ -48,13 +47,13 @@ private ExecutorSwitchingServerCallListener(
4847
}
4948

5049
@Override
51-
public void onMessageAndHalfClose(MessageLite message) {
52-
userExecutor.execute(() -> listener.onMessageAndHalfClose(message));
50+
public void invoke(MessageLite message) {
51+
userExecutor.execute(() -> listener.invoke(message));
5352
}
5453

5554
// A bit of explanation why the following methods are not executed on the user executor.
5655
//
57-
// The listener methods onReady/onCancel/onComplete are used purely for notification reasons,
56+
// The listener methods listenerReady/cancel/close are used purely for notification reasons,
5857
// they don't execute any user code.
5958
//
6059
// Running them in the userExecutor can also be problematic if the listener
@@ -64,18 +63,18 @@ public void onMessageAndHalfClose(MessageLite message) {
6463
// as thread local.
6564

6665
@Override
67-
public void onCancel() {
68-
listener.onCancel();
66+
public void cancel() {
67+
listener.cancel();
6968
}
7069

7170
@Override
72-
public void onComplete() {
73-
listener.onComplete();
71+
public void close() {
72+
listener.close();
7473
}
7574

7675
@Override
77-
public void onReady() {
78-
listener.onReady();
76+
public void listenerReady() {
77+
listener.listenerReady();
7978
}
8079
}
8180

@@ -182,6 +181,18 @@ public <T> void resolveDeferred(
182181
syscallsExecutor.execute(() -> syscalls.resolveDeferred(deferredToResolve, callback));
183182
}
184183

184+
@Override
185+
public String getFullyQualifiedMethodName() {
186+
// We can read this from another thread
187+
return syscalls.getFullyQualifiedMethodName();
188+
}
189+
190+
@Override
191+
public InvocationState getInvocationState() {
192+
// We can read this from another thread
193+
return syscalls.getInvocationState();
194+
}
195+
185196
@Override
186197
public void close() {
187198
syscallsExecutor.execute(syscalls::close);
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate Java SDK,
4+
// which is released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
9+
package dev.restate.sdk.core;
10+
11+
import dev.restate.sdk.common.InvocationId;
12+
import java.util.Collections;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
import org.apache.logging.log4j.core.util.ContextDataProvider;
16+
17+
/**
18+
* Log4j2 ContextDataProvider inferring context from the Grpc context.
19+
*
20+
* <p>This is used to propagate the context to the user code, such that log statements from the user
21+
* will contain the restate logging context variables.
22+
*
23+
* <p>This is based on grpc Context due to the fact that it's the only guaranteed thread
24+
* local/context we can rely on that is always available in the user code.
25+
*/
26+
public class GrpcContextDataProvider implements ContextDataProvider {
27+
@Override
28+
public Map<String, String> supplyContextData() {
29+
InvocationId invocationId = InvocationId.INVOCATION_ID_KEY.get();
30+
SyscallsInternal syscalls = (SyscallsInternal) SyscallsInternal.SYSCALLS_KEY.get();
31+
32+
if (invocationId == null) {
33+
return Collections.emptyMap();
34+
}
35+
36+
// We can't use the immutable MapN implementation from Map.of because of
37+
// https://github.com/apache/logging-log4j2/issues/2098
38+
HashMap<String, String> m = new HashMap<>();
39+
m.put(RestateGrpcServer.LoggingContextSetter.INVOCATION_ID_KEY, invocationId.toString());
40+
m.put(
41+
RestateGrpcServer.LoggingContextSetter.SERVICE_METHOD_KEY,
42+
syscalls.getFullyQualifiedMethodName());
43+
m.put(
44+
RestateGrpcServer.LoggingContextSetter.SERVICE_INVOCATION_STATUS_KEY,
45+
syscalls.getInvocationState().toString());
46+
return m;
47+
}
48+
}

sdk-core/src/main/java/dev/restate/sdk/core/GrpcServerCallListenerAdaptor.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@
88
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
99
package dev.restate.sdk.core;
1010

11-
import io.grpc.Metadata;
12-
import io.grpc.ServerCall;
13-
import io.grpc.Status;
11+
import io.grpc.*;
1412
import org.apache.logging.log4j.LogManager;
1513
import org.apache.logging.log4j.Logger;
1614

@@ -24,50 +22,75 @@ class GrpcServerCallListenerAdaptor<ReqT, RespT> implements RestateServerCallLis
2422

2523
private static final Logger LOG = LogManager.getLogger(GrpcServerCallListenerAdaptor.class);
2624

25+
private final Context context;
2726
private final ServerCall<ReqT, RespT> serverCall;
28-
2927
private final ServerCall.Listener<ReqT> delegate;
3028

3129
GrpcServerCallListenerAdaptor(
32-
ServerCall.Listener<ReqT> delegate, ServerCall<ReqT, RespT> serverCall) {
33-
this.delegate = delegate;
30+
Context context,
31+
ServerCall<ReqT, RespT> serverCall,
32+
Metadata headers,
33+
ServerCallHandler<ReqT, RespT> next) {
34+
this.context = context;
3435
this.serverCall = serverCall;
36+
37+
// This emulates Contexts.interceptCall.
38+
// We need it because some code generator depends on the fact that startCall already has the
39+
// context available
40+
Context previous = this.context.attach();
41+
try {
42+
this.delegate = next.startCall(serverCall, headers);
43+
} finally {
44+
this.context.detach(previous);
45+
}
3546
}
3647

3748
@Override
38-
public void onMessageAndHalfClose(ReqT message) {
49+
public void invoke(ReqT message) {
50+
Context previous = context.attach();
3951
try {
4052
delegate.onMessage(message);
4153
delegate.onHalfClose();
4254
} catch (Throwable e) {
4355
closeWithException(e);
56+
} finally {
57+
context.detach(previous);
4458
}
4559
}
4660

4761
@Override
48-
public void onCancel() {
62+
public void cancel() {
63+
Context previous = context.attach();
4964
try {
5065
delegate.onCancel();
5166
} catch (Throwable e) {
5267
closeWithException(e);
68+
} finally {
69+
context.detach(previous);
5370
}
5471
}
5572

5673
@Override
57-
public void onComplete() {
74+
public void close() {
75+
Context previous = context.attach();
5876
try {
5977
delegate.onComplete();
6078
} catch (Throwable e) {
6179
closeWithException(e);
80+
} finally {
81+
context.detach(previous);
6282
}
6383
}
6484

6585
@Override
66-
public void onReady() {
86+
public void listenerReady() {
87+
Context previous = context.attach();
6788
try {
6889
delegate.onReady();
6990
} catch (Throwable e) {
7091
closeWithException(e);
92+
} finally {
93+
context.detach(previous);
7194
}
7295
}
7396

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate Java SDK,
4+
// which is released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
9+
package dev.restate.sdk.core;
10+
11+
public enum InvocationState {
12+
WAITING_START,
13+
REPLAYING,
14+
PROCESSING,
15+
CLOSED;
16+
}

0 commit comments

Comments
 (0)