Skip to content

Commit 2aacdcd

Browse files
authored
Fixed a deadlock in the PoolBasedSequentialScheduledExecutorService (openhab#4247)
Signed-off-by: Jörg Sautter <joerg.sautter@gmx.net>
1 parent 8e3ca9d commit 2aacdcd

File tree

2 files changed

+104
-22
lines changed

2 files changed

+104
-22
lines changed

bundles/org.openhab.core/src/main/java/org/openhab/core/common/PoolBasedSequentialScheduledExecutorService.java

Lines changed: 72 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.IdentityHashMap;
1919
import java.util.List;
2020
import java.util.Set;
21+
import java.util.WeakHashMap;
2122
import java.util.concurrent.Callable;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.ExecutionException;
@@ -31,9 +32,11 @@
3132
import java.util.concurrent.ScheduledThreadPoolExecutor;
3233
import java.util.concurrent.TimeUnit;
3334
import java.util.concurrent.TimeoutException;
35+
import java.util.concurrent.atomic.AtomicInteger;
3436
import java.util.function.BiFunction;
3537
import java.util.function.Function;
3638

39+
import org.eclipse.jdt.annotation.NonNull;
3740
import org.eclipse.jdt.annotation.NonNullByDefault;
3841
import org.eclipse.jdt.annotation.Nullable;
3942

@@ -50,10 +53,13 @@
5053
* @author Jörg Sautter - Initial contribution
5154
*/
5255
@NonNullByDefault
53-
class PoolBasedSequentialScheduledExecutorService implements ScheduledExecutorService {
56+
final class PoolBasedSequentialScheduledExecutorService implements ScheduledExecutorService {
57+
58+
private static final WeakHashMap<ScheduledThreadPoolExecutor, @NonNull AtomicInteger> PENDING_BY_POOL = new WeakHashMap<>();
5459

5560
private final WorkQueueEntry empty;
5661
private final ScheduledThreadPoolExecutor pool;
62+
private final AtomicInteger pending;
5763
private final List<RunnableFuture<?>> scheduled;
5864
private final ScheduledFuture<?> cleaner;
5965
private @Nullable WorkQueueEntry tail;
@@ -75,6 +81,20 @@ public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor p
7581

7682
tail = empty;
7783

84+
// we need one pending counter per pool
85+
synchronized (PENDING_BY_POOL) {
86+
AtomicInteger fromCache = PENDING_BY_POOL.get(pool);
87+
88+
if (fromCache == null) {
89+
// set to one does ensure at least one thread more than tasks running
90+
fromCache = new AtomicInteger(1);
91+
92+
PENDING_BY_POOL.put(pool, fromCache);
93+
}
94+
95+
pending = fromCache;
96+
}
97+
7898
// clean up to ensure we do not keep references to old tasks
7999
cleaner = this.scheduleWithFixedDelay(() -> {
80100
synchronized (this) {
@@ -100,22 +120,24 @@ public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor p
100120
tail = empty;
101121
}
102122
}
103-
}, 2, 4, TimeUnit.SECONDS);
123+
},
124+
// avoid cleaners of promptly created instances to run at the same time
125+
(System.nanoTime() % 13), 8, TimeUnit.SECONDS);
104126
}
105127

106128
@Override
107129
public ScheduledFuture<?> schedule(@Nullable Runnable command, long delay, @Nullable TimeUnit unit) {
108130
return schedule((origin) -> pool.schedule(() -> {
109-
// we block the thread here, in worst case new threads are spawned
110-
submitToWorkQueue(origin.join(), command).join();
131+
// we might block the thread here, in worst case new threads are spawned
132+
submitToWorkQueue(origin.join(), command, true).join();
111133
}, delay, unit));
112134
}
113135

114136
@Override
115137
public <V> ScheduledFuture<V> schedule(@Nullable Callable<V> callable, long delay, @Nullable TimeUnit unit) {
116138
return schedule((origin) -> pool.schedule(() -> {
117-
// we block the thread here, in worst case new threads are spawned
118-
return submitToWorkQueue(origin.join(), callable).join();
139+
// we might block the thread here, in worst case new threads are spawned
140+
return submitToWorkQueue(origin.join(), callable, true).join();
119141
}, delay, unit));
120142
}
121143

@@ -126,13 +148,13 @@ public ScheduledFuture<?> scheduleAtFixedRate(@Nullable Runnable command, long i
126148
CompletableFuture<?> submitted;
127149

128150
try {
129-
// we block the thread here, in worst case new threads are spawned
130-
submitted = submitToWorkQueue(origin.join(), command);
151+
submitted = submitToWorkQueue(origin.join(), command, true);
131152
} catch (RejectedExecutionException ex) {
132153
// the pool has been shutdown, scheduled tasks should cancel
133154
return;
134155
}
135156

157+
// we might block the thread here, in worst case new threads are spawned
136158
submitted.join();
137159
}, initialDelay, period, unit));
138160
}
@@ -144,13 +166,13 @@ public ScheduledFuture<?> scheduleWithFixedDelay(@Nullable Runnable command, lon
144166
CompletableFuture<?> submitted;
145167

146168
try {
147-
// we block the thread here, in worst case new threads are spawned
148-
submitted = submitToWorkQueue(origin.join(), command);
169+
submitted = submitToWorkQueue(origin.join(), command, true);
149170
} catch (RejectedExecutionException ex) {
150171
// the pool has been shutdown, scheduled tasks should cancel
151172
return;
152173
}
153174

175+
// we might block the thread here, in worst case new threads are spawned
154176
submitted.join();
155177
}, initialDelay, delay, unit));
156178
}
@@ -255,20 +277,21 @@ public boolean awaitTermination(long timeout, @Nullable TimeUnit unit) throws In
255277

256278
@Override
257279
public <T> Future<T> submit(@Nullable Callable<T> task) {
258-
return submitToWorkQueue(null, task);
280+
return submitToWorkQueue(null, task, false);
259281
}
260282

261-
private CompletableFuture<?> submitToWorkQueue(RunnableFuture<?> origin, @Nullable Runnable task) {
283+
private CompletableFuture<?> submitToWorkQueue(RunnableFuture<?> origin, @Nullable Runnable task, boolean inPool) {
262284
Callable<?> callable = () -> {
263285
task.run();
264286

265287
return null;
266288
};
267289

268-
return submitToWorkQueue(origin, callable);
290+
return submitToWorkQueue(origin, callable, inPool);
269291
}
270292

271-
private <T> CompletableFuture<T> submitToWorkQueue(@Nullable RunnableFuture<?> origin, @Nullable Callable<T> task) {
293+
private <T> CompletableFuture<T> submitToWorkQueue(@Nullable RunnableFuture<?> origin, @Nullable Callable<T> task,
294+
boolean inPool) {
272295
BiFunction<? super Object, Throwable, T> action = (result, error) -> {
273296
// ignore result & error, they are from the previous task
274297
try {
@@ -278,22 +301,45 @@ private <T> CompletableFuture<T> submitToWorkQueue(@Nullable RunnableFuture<?> o
278301
} catch (Exception ex) {
279302
// a small hack to throw the Exception unchecked
280303
throw PoolBasedSequentialScheduledExecutorService.unchecked(ex);
304+
} finally {
305+
pending.decrementAndGet();
281306
}
282307
};
283308

309+
RunnableCompletableFuture<T> cf;
310+
boolean runNow;
311+
284312
synchronized (this) {
285313
if (tail == null) {
286314
throw new RejectedExecutionException("this scheduled executor has been shutdown before");
287315
}
288316

289-
RunnableCompletableFuture<T> cf = tail.future.handleAsync(action, pool);
317+
// set the core pool size even if it does not change, this triggers idle threads to stop
318+
pool.setCorePoolSize(pending.incrementAndGet());
290319

291-
cf.setCallable(task);
320+
// avoid waiting for one pool thread to finish inside a pool thread
321+
runNow = inPool && tail.future.isDone();
292322

293-
tail = new WorkQueueEntry(tail, origin, cf);
323+
if (runNow) {
324+
cf = new RunnableCompletableFuture<>(task);
325+
tail = new WorkQueueEntry(null, origin, cf);
326+
} else {
327+
cf = tail.future.handleAsync(action, pool);
328+
cf.setCallable(task);
329+
tail = new WorkQueueEntry(tail, origin, cf);
330+
}
331+
}
294332

295-
return cf;
333+
if (runNow) {
334+
// ensure we do not wait for one pool thread to finish inside another pool thread
335+
try {
336+
cf.run();
337+
} finally {
338+
pending.decrementAndGet();
339+
}
296340
}
341+
342+
return cf;
297343
}
298344

299345
private static <E extends RuntimeException> E unchecked(Exception ex) throws E {
@@ -306,7 +352,7 @@ public <T> Future<T> submit(@Nullable Runnable task, T result) {
306352
task.run();
307353

308354
return result;
309-
});
355+
}, false);
310356
}
311357

312358
@Override
@@ -343,7 +389,7 @@ public <T> List<Future<T>> invokeAll(@Nullable Collection<? extends @Nullable Ca
343389
List<Future<T>> futures = new ArrayList<>();
344390

345391
for (Callable<T> task : tasks) {
346-
futures.add(submitToWorkQueue(null, task).orTimeout(timeout, unit));
392+
futures.add(submitToWorkQueue(null, task, false).orTimeout(timeout, unit));
347393
}
348394

349395
// wait for all futures to complete
@@ -381,7 +427,7 @@ private <T> T invokeAny(@Nullable Collection<? extends @Nullable Callable<T>> ta
381427
List<CompletableFuture<T>> futures = new ArrayList<>();
382428

383429
for (Callable<T> task : tasks) {
384-
futures.add(submitToWorkQueue(null, task));
430+
futures.add(submitToWorkQueue(null, task, false));
385431
}
386432

387433
// wait for any future to complete
@@ -452,7 +498,11 @@ static class RunnableCompletableFuture<V> extends CompletableFuture<V> implement
452498
private @Nullable Callable<V> callable;
453499

454500
public RunnableCompletableFuture() {
455-
callable = null;
501+
this.callable = null;
502+
}
503+
504+
public RunnableCompletableFuture(@Nullable Callable<V> callable) {
505+
this.callable = callable;
456506
}
457507

458508
public void setCallable(@Nullable Callable<V> callable) {

bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,38 @@ public void testSchedulingGetsBlockedByRegularTaskInSequentialExecutorService()
174174
assertFalse(latch.await(100, TimeUnit.MILLISECONDS));
175175
}
176176

177+
@Test
178+
public void testSchedulingDoesSpawnNewThreads() throws InterruptedException {
179+
ScheduledExecutorService serviceA = ThreadPoolManager
180+
.getPoolBasedSequentialScheduledExecutorService("unit-test", "thread-7");
181+
ScheduledExecutorService serviceB = ThreadPoolManager
182+
.getPoolBasedSequentialScheduledExecutorService("unit-test", "thread-8");
183+
184+
for (int j = 0; j < 3; j++) {
185+
CountDownLatch block = new CountDownLatch(1);
186+
CountDownLatch check = new CountDownLatch(20);
187+
188+
for (int i = 0; i < 20; i++) {
189+
serviceA.schedule(() -> {
190+
try {
191+
block.await();
192+
} catch (InterruptedException e) {
193+
194+
}
195+
check.countDown();
196+
}, 1, TimeUnit.MILLISECONDS);
197+
}
198+
199+
Thread.sleep(80);
200+
201+
serviceB.schedule(() -> {
202+
block.countDown();
203+
}, 20, TimeUnit.MILLISECONDS);
204+
205+
assertTrue(check.await(80, TimeUnit.MILLISECONDS));
206+
}
207+
}
208+
177209
@Test
178210
public void testGetScheduledPool() {
179211
ThreadPoolExecutor result = ThreadPoolManager.getScheduledPoolUnwrapped("test1");

0 commit comments

Comments
 (0)