diff --git a/src/com/tunnelvisionlabs/util/concurrent/ThreadingTools.java b/src/com/tunnelvisionlabs/util/concurrent/ThreadingTools.java index 2fb0fd2..bcb49b9 100644 --- a/src/com/tunnelvisionlabs/util/concurrent/ThreadingTools.java +++ b/src/com/tunnelvisionlabs/util/concurrent/ThreadingTools.java @@ -13,6 +13,8 @@ public enum ThreadingTools { ; + private static final Disposable EMPTY_DISPOSABLE = () -> { }; + // /// // /// Optimistically performs some value transformation based on some field and tries to apply it back to the field, // /// retrying as many times as necessary until no other thread is manipulating the same field. @@ -49,6 +51,18 @@ public enum ThreadingTools { // return true; // } + public static Disposable interruptOnCancel(@NotNull CancellationToken cancellationToken) { + return interruptOnCancel(Thread.currentThread(), cancellationToken); + } + + public static Disposable interruptOnCancel(@NotNull Thread thread, @NotNull CancellationToken cancellationToken) { + if (!cancellationToken.canBeCancelled()) { + return EMPTY_DISPOSABLE; + } + + return cancellationToken.register(thread::interrupt); + } + /** * Wraps a future with one that will complete as canceled based on a cancellation token, allowing someone to await * a task but be able to break out early by canceling the token. diff --git a/test/com/tunnelvisionlabs/util/concurrent/AsyncTest.java b/test/com/tunnelvisionlabs/util/concurrent/AsyncTest.java index 25f163a..070662a 100644 --- a/test/com/tunnelvisionlabs/util/concurrent/AsyncTest.java +++ b/test/com/tunnelvisionlabs/util/concurrent/AsyncTest.java @@ -1,6 +1,8 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. package com.tunnelvisionlabs.util.concurrent; +import com.tunnelvisionlabs.util.validation.NotNull; +import java.time.Duration; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -143,4 +145,31 @@ public Void getResult() { asyncTest.join(); } + + @NotNull + private CompletableFuture sleepingMethodAsync(@NotNull Duration duration, @NotNull CancellationToken cancellationToken) { + return Async.runAsync(() -> { + try (Disposable registration = ThreadingTools.interruptOnCancel(cancellationToken)) { + try { + Thread.sleep(duration.toMillis()); + } catch (InterruptedException ex) { + return Futures.completedCancelled(); + } + } + + return Futures.completedNull(); + }); + } + + @Test + public void testCancelSleep() { + CompletableFuture asyncTest = Async.runAsync(() -> { + return Async.awaitAsync(AsyncAssert.assertCancelsAsync(() -> { + CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(Duration.ofMillis(20)); + return sleepingMethodAsync(Duration.ofMinutes(5), cancellationTokenSource.getToken()); + })); + }); + + asyncTest.join(); + } }