Skip to content

Commit 7796b67

Browse files
committed
Fix usages of some async utilities
1 parent 2c4e38c commit 7796b67

File tree

5 files changed

+51
-55
lines changed

5 files changed

+51
-55
lines changed

spock-core/src/main/java/org/spockframework/runtime/extension/builtin/RepeatUntilFailureExtension.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,8 @@ public void runIterations(IDataIterator dataIterator, IIterationRunner iteration
3939
dataIterator.forEachRemaining(arguments::add);
4040
for (int attempt = 0; attempt < maxAttempts; attempt++) {
4141
for (Object[] args : arguments) {
42-
try {
43-
ExecutionResult executionResult = iterationRunner.runIteration(args, maxIterations).get();
44-
if (executionResult == ExecutionResult.FAILED) {
45-
return;
46-
}
47-
} catch (InterruptedException | ExecutionException e) {
42+
ExecutionResult executionResult = iterationRunner.runIteration(args, maxIterations).join();
43+
if (executionResult == ExecutionResult.FAILED) {
4844
return;
4945
}
5046
}

spock-core/src/main/java/org/spockframework/runtime/extension/builtin/TimeoutInterceptor.java

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -66,32 +66,56 @@ public void intercept(final IMethodInvocation invocation) throws Throwable {
6666
long timeoutAt = 0;
6767
int unsuccessfulInterruptAttempts = 0;
6868

69-
syncWithThread(startLatch, "feature", methodName);
69+
boolean syncedWithFeature = false;
70+
try {
71+
startLatch.countDown();
72+
syncedWithFeature = startLatch.await(5, TimeUnit.SECONDS);
73+
} catch (InterruptedException ignored) {
74+
// this is our own thread, so we can ignore the interruption safely
75+
}
76+
if (!syncedWithFeature) {
77+
System.out.printf("[spock.lang.Timeout] Could not sync with Feature for method '%s'", methodName);
78+
}
7079

80+
while (waitMillis > 0) {
81+
long waitStart = System.nanoTime();
82+
try {
83+
synced = sync.offer(stackTrace, waitMillis, TimeUnit.MILLISECONDS);
84+
} catch (InterruptedException ignored) {
85+
// this is our own thread, so we can ignore the interruption safely and continue the remaining waiting
86+
waitMillis -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - waitStart);
87+
continue;
88+
}
89+
break;
90+
}
91+
if (!synced) {
92+
stackTrace = mainThread.getStackTrace();
93+
waitMillis = 250;
94+
}
7195
while (!synced) {
96+
mainThread.interrupt();
7297
try {
7398
synced = sync.offer(stackTrace, waitMillis, TimeUnit.MILLISECONDS);
7499
} catch (InterruptedException ignored) {
75100
// The mission of this thread is to repeatedly interrupt the main thread until
76101
// the latter returns. Once this mission has been accomplished, this thread will die quickly
77102
}
78103
if (!synced) {
79-
long now = System.nanoTime();
80-
if (stackTrace.length == 0) {
81-
logMethodTimeout(methodName, timeoutSeconds);
82-
stackTrace = mainThread.getStackTrace();
83-
waitMillis = 250;
84-
timeoutAt = now;
85-
} else {
86-
waitMillis *= 2;
87-
logUnsuccessfulInterrupt(methodName, now, timeoutAt, waitMillis, ++unsuccessfulInterruptAttempts);
88-
}
89-
mainThread.interrupt();
104+
System.out.printf("[spock.lang.Timeout] Method '%s' has not yet returned - interrupting. Next try in %1.2f seconds.\n",
105+
methodName, waitMillis / 1000.);
90106
}
91107
}
92108
}).start();
93109

94-
syncWithThread(startLatch, "watcher", methodName);
110+
boolean syncedWithWatcher = false;
111+
try {
112+
startLatch.countDown();
113+
syncedWithWatcher = startLatch.await(5, TimeUnit.SECONDS);
114+
} finally {
115+
if (!syncedWithWatcher) {
116+
System.out.printf("[spock.lang.Timeout] Could not sync with Watcher for method '%s'", invocation.getMethod().getName());
117+
}
118+
}
95119

96120
Throwable saved = null;
97121
try {
@@ -211,13 +235,4 @@ private static Pair<Integer, Integer> findThreadSection(List<String> lines, Stri
211235

212236
return null;
213237
}
214-
215-
private static void syncWithThread(CountDownLatch startLatch, String threadName, String methodName) {
216-
try {
217-
startLatch.countDown();
218-
startLatch.await(5, TimeUnit.SECONDS);
219-
} catch (InterruptedException ignored) {
220-
System.out.printf("[spock.lang.Timeout] Could not sync with %s thread for method '%s'", threadName, methodName);
221-
}
222-
}
223238
}

spock-core/src/main/java/spock/util/concurrent/AsyncConditions.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,14 @@ public void await() throws Throwable {
131131
* @throws Throwable the first exception thrown by an evaluate block
132132
*/
133133
public void await(double seconds) throws Throwable {
134-
latch.await((long) (seconds * 1000), TimeUnit.MILLISECONDS);
134+
boolean evalBlocksFinished = latch.await((long) (seconds * 1000), TimeUnit.MILLISECONDS);
135135
if (!exceptions.isEmpty())
136136
throw exceptions.poll();
137137

138-
long pendingEvalBlocks = latch.getCount();
139-
if (pendingEvalBlocks > 0) {
138+
if (!evalBlocksFinished) {
140139
String msg = String.format("Async conditions timed out " +
141140
"after %1.2f seconds; %d out of %d evaluate blocks did not complete in time",
142-
seconds, pendingEvalBlocks, numEvalBlocks);
141+
seconds, latch.getCount(), numEvalBlocks);
143142
throw new SpockTimeoutError(seconds, msg);
144143
}
145144
}

spock-specs/src/test/groovy/org/spockframework/smoke/extension/ParallelSpec.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ class ParallelSpec extends EmbeddedSpecification {
265265
@ResourceLock(value = "a", mode = ResourceAccessMode.READ)
266266
def writeA() {
267267
when:
268-
incrementAndBlock(atomicInteger, latch)
268+
incrementAndBlock(atomicInteger, latch, 10000)
269269
270270
then:
271271
atomicInteger.get() == 3
@@ -519,15 +519,15 @@ class ParallelSpec extends EmbeddedSpecification {
519519
throws InterruptedException {
520520
int value = sharedResource.incrementAndGet()
521521
countDownLatch.countDown()
522-
countDownLatch.await(timeout, MILLISECONDS)
522+
assert countDownLatch.await(timeout, MILLISECONDS) : 'Timeout expired'
523523
return value
524524
}
525525

526526
static void storeAndBlockAndCheck(AtomicInteger sharedResource, CountDownLatch countDownLatch, long timeout = 100)
527527
throws InterruptedException {
528528
int value = sharedResource.get()
529529
countDownLatch.countDown()
530-
countDownLatch.await(timeout, MILLISECONDS)
530+
assert countDownLatch.await(timeout, MILLISECONDS) : 'Timeout expired'
531531
assert value == sharedResource.get()
532532
}
533533
}

spock-specs/src/test/groovy/org/spockframework/smoke/mock/InvokingMocksFromMultipleThreads.groovy

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,11 @@ class InvokingMocksFromMultipleThreads extends Specification {
3333
def numThreads = 10
3434
def list = Mock(List)
3535
def latch = new CountDownLatch(numThreads)
36-
@AutoCleanup("shutdownNow")
37-
@Shared
38-
def executorService = createExecutorService()
3936

4037
def "invoking a mock from multiple threads"() {
4138
when:
4239
numThreads.times { threadId ->
43-
executorService.submit {
40+
Thread.start {
4441
try {
4542
100.times { count ->
4643
list.add(count)
@@ -51,7 +48,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
5148
}
5249
}
5350
}
54-
awaitLatch()
51+
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'
5552

5653
then:
5754
interaction {
@@ -63,7 +60,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
6360
def "invoking a mock from multiple threads - too many invocations"() {
6461
when:
6562
numThreads.times { threadId ->
66-
executorService.submit {
63+
Thread.start {
6764
try {
6865
100.times { count ->
6966
list.add(count)
@@ -77,7 +74,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
7774
}
7875
}
7976
}
80-
awaitLatch()
77+
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'
8178

8279
then:
8380
interaction {
@@ -89,7 +86,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
8986
def "invoking a mock from multiple threads - too few invocations"() {
9087
when:
9188
numThreads.times { threadId ->
92-
executorService.submit {
89+
Thread.start {
9390
try {
9491
100.times { count ->
9592
if (!(threadId == 0 && count == 99)) list.add(count)
@@ -102,22 +99,11 @@ class InvokingMocksFromMultipleThreads extends Specification {
10299
}
103100
}
104101
}
105-
awaitLatch()
102+
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'
106103

107104
then:
108105
interaction {
109106
100.times { count -> numThreads * list.add(count) }
110107
}
111108
}
112-
113-
114-
private static ExecutorService createExecutorService() {
115-
return Jvm.current.java21Compatible ? Executors."newVirtualThreadPerTaskExecutor"() : Executors.newCachedThreadPool() { new Thread(it).tap { it.daemon = true } }
116-
}
117-
118-
private void awaitLatch() {
119-
if (!latch.await(WAIT_TIME_S, TimeUnit.SECONDS)) {
120-
throw new IllegalStateException("The test threads did not terminate in ${WAIT_TIME_S}s.")
121-
}
122-
}
123109
}

0 commit comments

Comments
 (0)