Skip to content

Commit 69ec01a

Browse files
committed
GH-399 - Avoid event publication completion for failed CompletableFutures.
We now explicitly handle CompletableFuture instances returned from transactional event listeners by registering the completion handlers only on success, the debug logging on failure and immediately return the decorated instance. Previously, a failed CompletableFuture instance would still have the publication marked completed as it doesn't cause any exception being thrown and thus ultimately ending up in the code path that issues the completion.
1 parent 768103f commit 69ec01a

File tree

2 files changed

+88
-12
lines changed

2 files changed

+88
-12
lines changed

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/CompletionRegisteringAdvisor.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.modulith.events.support;
1717

1818
import java.lang.reflect.Method;
19+
import java.util.concurrent.CompletableFuture;
1920
import java.util.function.Supplier;
2021

2122
import org.aopalliance.aop.Advice;
@@ -31,6 +32,7 @@
3132
import org.springframework.core.Ordered;
3233
import org.springframework.core.annotation.AnnotatedElementUtils;
3334
import org.springframework.lang.NonNull;
35+
import org.springframework.lang.Nullable;
3436
import org.springframework.modulith.events.core.EventPublicationRegistry;
3537
import org.springframework.modulith.events.core.PublicationTargetIdentifier;
3638
import org.springframework.transaction.event.TransactionPhase;
@@ -164,25 +166,30 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
164166

165167
Object result = null;
166168
var method = invocation.getMethod();
169+
var argument = invocation.getArguments()[0];
167170

168171
try {
172+
169173
result = invocation.proceed();
170-
} catch (Exception o_O) {
171174

172-
if (LOG.isDebugEnabled()) {
173-
LOG.debug("Invocation of listener {} failed. Leaving event publication uncompleted.", method, o_O);
174-
} else {
175-
LOG.info("Invocation of listener {} failed with message {}. Leaving event publication uncompleted.",
176-
method, o_O.getMessage());
175+
if (result instanceof CompletableFuture<?> future) {
176+
177+
return future
178+
.thenAccept(it -> markCompleted(method, argument))
179+
.exceptionallyCompose(it -> {
180+
handleFailure(method, it);
181+
return CompletableFuture.failedFuture(it);
182+
});
177183
}
178184

185+
} catch (Throwable o_O) {
186+
187+
handleFailure(method, o_O);
188+
179189
throw o_O;
180190
}
181191

182-
// Mark publication complete if the method is a transactional event listener.
183-
String adapterId = ADAPTERS.get(method).getListenerId();
184-
PublicationTargetIdentifier identifier = PublicationTargetIdentifier.of(adapterId);
185-
registry.get().markCompleted(invocation.getArguments()[0], identifier);
192+
markCompleted(method, argument);
186193

187194
return result;
188195
}
@@ -196,6 +203,27 @@ public int getOrder() {
196203
return Ordered.HIGHEST_PRECEDENCE + 10;
197204
}
198205

206+
@Nullable
207+
private static Void handleFailure(Method method, Throwable o_O) {
208+
209+
if (LOG.isDebugEnabled()) {
210+
LOG.debug("Invocation of listener {} failed. Leaving event publication uncompleted.", method, o_O);
211+
} else {
212+
LOG.info("Invocation of listener {} failed with message {}. Leaving event publication uncompleted.",
213+
method, o_O.getMessage());
214+
}
215+
216+
return null;
217+
}
218+
219+
private void markCompleted(Method method, Object event) {
220+
221+
// Mark publication complete if the method is a transactional event listener.
222+
String adapterId = ADAPTERS.get(method).getListenerId();
223+
PublicationTargetIdentifier identifier = PublicationTargetIdentifier.of(adapterId);
224+
registry.get().markCompleted(event, identifier);
225+
}
226+
199227
private static TransactionalApplicationListenerMethodAdapter createAdapter(Method method) {
200228
return new TransactionalApplicationListenerMethodAdapter(null, method.getDeclaringClass(), method);
201229
}

spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/support/CompletionRegisteringAdvisorUnitTests.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import static org.mockito.ArgumentMatchers.*;
2020
import static org.mockito.Mockito.*;
2121

22+
import java.util.concurrent.CompletableFuture;
2223
import java.util.function.BiConsumer;
2324

2425
import org.junit.jupiter.api.Test;
2526
import org.springframework.aop.framework.Advised;
2627
import org.springframework.aop.framework.ProxyFactory;
2728
import org.springframework.context.event.EventListener;
2829
import org.springframework.modulith.events.core.EventPublicationRegistry;
30+
import org.springframework.scheduling.annotation.Async;
2931
import org.springframework.transaction.event.TransactionPhase;
3032
import org.springframework.transaction.event.TransactionalEventListener;
3133

@@ -59,6 +61,34 @@ void doesNotTriggerCompletionForNonEventListener() {
5961
assertNonCompletion(SomeEventListener::nonEventListener);
6062
}
6163

64+
@Test // GH-399
65+
void doesNotTriggerCompletionOnFailedCompletableFuture() throws Throwable {
66+
67+
var result = createProxyFor(bean).asyncWithResult(true);
68+
69+
assertThat(result.isDone()).isFalse();
70+
verify(registry, never()).markCompleted(any(), any());
71+
72+
Thread.sleep(500);
73+
74+
assertThat(result.isCompletedExceptionally()).isTrue();
75+
verify(registry, never()).markCompleted(any(), any());
76+
}
77+
78+
@Test // GH-399
79+
void marksLazilyComputedCompletableFutureAsCompleted() throws Throwable {
80+
81+
var result = createProxyFor(bean).asyncWithResult(false);
82+
83+
assertThat(result.isDone()).isFalse();
84+
verify(registry, never()).markCompleted(any(), any());
85+
86+
Thread.sleep(500);
87+
88+
assertThat(result.isCompletedExceptionally()).isFalse();
89+
verify(registry).markCompleted(any(), any());
90+
}
91+
6292
private void assertCompletion(BiConsumer<SomeEventListener, Object> consumer) {
6393
assertCompletion(consumer, true);
6494
}
@@ -78,11 +108,12 @@ private void assertCompletion(BiConsumer<SomeEventListener, Object> consumer, bo
78108
verify(registry, times(expected ? 1 : 0)).markCompleted(any(), any());
79109
}
80110

81-
private Object createProxyFor(Object bean) {
111+
@SuppressWarnings("unchecked")
112+
private <T> T createProxyFor(T bean) {
82113

83114
ProxyFactory factory = new ProxyFactory(bean);
84115
factory.addAdvisor(new CompletionRegisteringAdvisor(() -> registry));
85-
return factory.getProxy();
116+
return (T) factory.getProxy();
86117
}
87118

88119
static class SomeEventListener {
@@ -97,5 +128,22 @@ void onAfterRollback(Object object) {}
97128
void simpleEventListener(Object object) {}
98129

99130
void nonEventListener(Object object) {}
131+
132+
@Async
133+
@TransactionalEventListener
134+
CompletableFuture<?> asyncWithResult(boolean fail) {
135+
136+
return CompletableFuture.completedFuture(new Object())
137+
.thenComposeAsync(it -> {
138+
139+
try {
140+
Thread.sleep(200);
141+
} catch (InterruptedException e) {}
142+
143+
return fail
144+
? CompletableFuture.failedFuture(new IllegalArgumentException())
145+
: CompletableFuture.completedFuture(it);
146+
});
147+
}
100148
}
101149
}

0 commit comments

Comments
 (0)