diff --git a/src/com/tunnelvisionlabs/util/concurrent/Async.java b/src/com/tunnelvisionlabs/util/concurrent/Async.java index 8406376..5793f36 100644 --- a/src/com/tunnelvisionlabs/util/concurrent/Async.java +++ b/src/com/tunnelvisionlabs/util/concurrent/Async.java @@ -107,12 +107,14 @@ public static Awaitable configureAwait(@NotNull CompletableFuture CompletableFuture runAsync(@NotNull Supplier> supplier) { + StrongBox> result = SharedPools.>strongBox().allocate(); try { - StrongBox> result = new StrongBox<>(); ExecutionContext.run(ExecutionContext.capture(), s -> result.value = s.get(), supplier); return result.value; } catch (Throwable ex) { return Futures.fromException(ex); + } finally { + SharedPools.>strongBox().free(result); } } diff --git a/src/com/tunnelvisionlabs/util/concurrent/AsyncLazy.java b/src/com/tunnelvisionlabs/util/concurrent/AsyncLazy.java index e55b919..80ce213 100644 --- a/src/com/tunnelvisionlabs/util/concurrent/AsyncLazy.java +++ b/src/com/tunnelvisionlabs/util/concurrent/AsyncLazy.java @@ -113,7 +113,7 @@ public final CompletableFuture getValueAsync(@NotNull CancellationT Verify.failOperation("ValueFactoryReentrancy"); } - final StrongBox resumableAwaiter = new StrongBox<>(); + InlineResumable resumableAwaiter = null; synchronized (syncObject) { // Note that if multiple threads hit getValueAsync() before // the valueFactory has completed its synchronous execution, @@ -125,11 +125,12 @@ public final CompletableFuture getValueAsync(@NotNull CancellationT return Futures.completedCancelled(); } - resumableAwaiter.value = new InlineResumable(); + resumableAwaiter = new InlineResumable(); + final InlineResumable finalResumableAwaiter = resumableAwaiter; Supplier> originalValueFactory = this.valueFactory.getAndSet(null); Supplier> localValueFactory = () -> Async.runAsync( () -> Async.awaitAsync( - resumableAwaiter.value, + finalResumableAwaiter, () -> Async.awaitAsync(Async.configureAwait(originalValueFactory.get(), false)))); this.recursiveFactoryCheck.setValue(RECURSIVE_CHECK_SENTINEL); @@ -154,8 +155,8 @@ public final CompletableFuture getValueAsync(@NotNull CancellationT } // Allow the original value factory to actually run. - if (resumableAwaiter.value != null) { - resumableAwaiter.value.resume(); + if (resumableAwaiter != null) { + resumableAwaiter.resume(); } } diff --git a/src/com/tunnelvisionlabs/util/concurrent/AsyncManualResetEvent.java b/src/com/tunnelvisionlabs/util/concurrent/AsyncManualResetEvent.java index be28e0e..ae75325 100644 --- a/src/com/tunnelvisionlabs/util/concurrent/AsyncManualResetEvent.java +++ b/src/com/tunnelvisionlabs/util/concurrent/AsyncManualResetEvent.java @@ -67,9 +67,10 @@ public AsyncManualResetEvent(boolean initialState, boolean allowInliningAwaiters future.complete(null); } - StrongBox> secondaryHandlerBox = new StrongBox<>(); + StrongBox> secondaryHandlerBox = SharedPools.>strongBox().allocate(); this.priorityFuture = Futures.createPriorityHandler(this.future, secondaryHandlerBox); this.secondaryFuture = secondaryHandlerBox.value; + SharedPools.>strongBox().free(secondaryHandlerBox); } /** @@ -131,9 +132,10 @@ public final void reset() { if (this.isSet) { this.future = this.createFuture(); - StrongBox> secondaryHandlerBox = new StrongBox<>(); + StrongBox> secondaryHandlerBox = SharedPools.>strongBox().allocate(); this.priorityFuture = Futures.createPriorityHandler(this.future, secondaryHandlerBox); this.secondaryFuture = secondaryHandlerBox.value; + SharedPools.>strongBox().free(secondaryHandlerBox); this.isSet = false; } @@ -162,9 +164,10 @@ final CompletableFuture pulseAllAsync() { this.future = this.createFuture(); - StrongBox> secondaryHandlerBox = new StrongBox<>(); + StrongBox> secondaryHandlerBox = SharedPools.>strongBox().allocate(); this.priorityFuture = Futures.createPriorityHandler(this.future, secondaryHandlerBox); this.secondaryFuture = secondaryHandlerBox.value; + SharedPools.>strongBox().free(secondaryHandlerBox); this.isSet = false; } diff --git a/src/com/tunnelvisionlabs/util/concurrent/ExecutionContext.java b/src/com/tunnelvisionlabs/util/concurrent/ExecutionContext.java index cb0ae2f..8e40eaf 100644 --- a/src/com/tunnelvisionlabs/util/concurrent/ExecutionContext.java +++ b/src/com/tunnelvisionlabs/util/concurrent/ExecutionContext.java @@ -240,9 +240,11 @@ public static Runnable wrap(@NotNull Runnable runnable) { public static Function wrap(@NotNull Function function) { ExecutionContext executionContext = capture(); return t -> { - StrongBox result = new StrongBox<>(); + StrongBox result = SharedPools.strongBox().allocate(); run(executionContext, state -> result.value = function.apply(t), null); - return result.value; + U value = result.value; + SharedPools.strongBox().free(result); + return value; }; } @@ -250,9 +252,11 @@ public static Function wrap(@NotNull Function function) { public static Supplier wrap(@NotNull Supplier supplier) { ExecutionContext executionContext = capture(); return () -> { - StrongBox result = new StrongBox<>(); + StrongBox result = SharedPools.strongBox().allocate(); run(executionContext, state -> result.value = supplier.get(), null); - return result.value; + T value = result.value; + SharedPools.strongBox().free(result); + return value; }; } diff --git a/src/com/tunnelvisionlabs/util/concurrent/JoinableFuture.java b/src/com/tunnelvisionlabs/util/concurrent/JoinableFuture.java index c356d2c..d82518f 100644 --- a/src/com/tunnelvisionlabs/util/concurrent/JoinableFuture.java +++ b/src/com/tunnelvisionlabs/util/concurrent/JoinableFuture.java @@ -703,10 +703,12 @@ final T completeOnCurrentThread() { // Don't use IsCompleted as the condition because that // includes queues of posted work that don't have to complete for the // JoinableTask to be ready to return from the JTF.Run method. - StrongBox>> visited = new StrongBox<>(); + StrongBox>> visited = SharedPools.>>strongBox().allocate(); + StrongBox> work = SharedPools.>strongBox().allocate(); + StrongBox> tryAgainAfter = SharedPools.>strongBox().allocate(); while (!isCompleteRequested()) { - StrongBox> work = new StrongBox<>(); - StrongBox> tryAgainAfter = new StrongBox<>(); + work.value = null; + tryAgainAfter.value = null; if (tryPollSelfOrDependencies(onMainThread, visited, work, tryAgainAfter)) { work.value.tryExecute(); } else if (tryAgainAfter.value != null) { @@ -716,6 +718,10 @@ final T completeOnCurrentThread() { assert tryAgainAfter.value.isDone(); } } + + SharedPools.>>strongBox().free(visited); + SharedPools.>strongBox().free(work); + SharedPools.>strongBox().free(tryAgainAfter); } finally { try (SpecializedSyncContext syncContext = SpecializedSyncContext.apply(getFactory().getContext().getNoMessagePumpSynchronizationContext())) { synchronized (owner.getContext().getSyncContextLock()) { @@ -1212,7 +1218,8 @@ private static void removeDependingSynchronousFutureFrom(List> reachableTasks = null; - final StrongBox>> remainTasks = new StrongBox<>(); + final StrongBox>> remainTasks = SharedPools.>>strongBox().allocate(); + final StrongBox>> remainPlaceHold = SharedPools.>>strongBox().allocate(); if (force) { reachableTasks = new HashSet<>(); @@ -1232,11 +1239,13 @@ private static void removeDependingSynchronousFutureFrom(List>> remainPlaceHold = new StrongBox<>(); for (JoinableFuture remainTask : remainTasks.value) { remainTask.removeDependingSynchronousFuture(syncTask, reachableTasks, remainPlaceHold); } } + + SharedPools.>>strongBox().free(remainTasks); + SharedPools.>>strongBox().free(remainPlaceHold); } /** diff --git a/src/com/tunnelvisionlabs/util/concurrent/JoinableFutureContext.java b/src/com/tunnelvisionlabs/util/concurrent/JoinableFutureContext.java index 63f98ae..3ba6e2a 100644 --- a/src/com/tunnelvisionlabs/util/concurrent/JoinableFutureContext.java +++ b/src/com/tunnelvisionlabs/util/concurrent/JoinableFutureContext.java @@ -590,8 +590,8 @@ public HangReportContribution getHangReport() { try (SpecializedSyncContext syncContext = SpecializedSyncContext.apply(getNoMessagePumpSynchronizationContext())) { synchronized (getSyncContextLock()) { try { - StrongBox nodes = new StrongBox<>(); - StrongBox links = new StrongBox<>(); + StrongBox nodes = SharedPools.strongBox().allocate(); + StrongBox links = SharedPools.strongBox().allocate(); Document dgml = createTemplateDgml(nodes, links); Map, Element> pendingTasksElements = createNodesForPendingFutures(dgml); @@ -621,6 +621,12 @@ public HangReportContribution getHangReport() { links.value.appendChild(pair.getValue()); } + SharedPools.strongBox().free(nodes); + nodes = null; + + SharedPools.strongBox().free(links); + links = null; + TransformerFactory transformerFactory = TransformerFactory.newInstance(); Transformer transformer = transformerFactory.newTransformer(); DOMSource source = new DOMSource(dgml); diff --git a/src/com/tunnelvisionlabs/util/concurrent/ObjectPool.java b/src/com/tunnelvisionlabs/util/concurrent/ObjectPool.java new file mode 100644 index 0000000..74913d5 --- /dev/null +++ b/src/com/tunnelvisionlabs/util/concurrent/ObjectPool.java @@ -0,0 +1,150 @@ +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +package com.tunnelvisionlabs.util.concurrent; + +import com.tunnelvisionlabs.util.validation.NotNull; +import com.tunnelvisionlabs.util.validation.Nullable; +import com.tunnelvisionlabs.util.validation.Requires; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * Generic implementation of object pooling pattern with predefined pool size limit. The main purpose is that limited + * number of frequently used objects can be kept in the pool for further recycling. + * + *

Notes:

+ * + *
    + *
  1. it is not the goal to keep all returned objects. Pool is not meant for storage. If there is no space in the pool, + * extra returned objects will be dropped.
  2. + *
  3. it is implied that if object was obtained from a pool, the caller will return it back in a relatively short time. + * Keeping checked out objects for long durations is OK, but reduces usefulness of pooling. Just new up your own.
  4. + *
+ * + *

Not returning objects to the pool in not detrimental to the pool's work, but is a bad practice. Rationale: If + * there is no intent for reusing the object, do not use pool - just use "new".

+ * + * @param The type of the objects in this cache. + */ +class ObjectPool { + private final AtomicReferenceArray items; + + /** + * {@code factory} is stored for the lifetime of the pool. We will call this only when pool needs to expand. + * Compared to {@link Class#newInstance()}, {@link Supplier} gives more flexibility to implementers and is faster + * than {@link Class#newInstance()}. + */ + private final Supplier factory; + + private final Consumer allocate; + + private final Consumer free; + + /** + * Storage for the pool objects. The first item is stored in a dedicated field because we expect to be able to + * satisfy most requests from it. + */ + private final AtomicReference firstItem = new AtomicReference<>(); + + ObjectPool(@NotNull Supplier factory) { + this(factory, null, null, Runtime.getRuntime().availableProcessors() * 2); + } + + ObjectPool(@NotNull Supplier factory, int size) { + this(factory, null, null, size); + } + + ObjectPool(@NotNull Supplier factory, @Nullable Consumer allocate, @Nullable Consumer free) { + this(factory, allocate, free, Runtime.getRuntime().availableProcessors() * 2); + } + + ObjectPool(@NotNull Supplier factory, @Nullable Consumer allocate, @Nullable Consumer free, int size) { + Requires.argument(size >= 1, "size", "The object pool can't be empty"); + this.factory = factory; + this.allocate = allocate; + this.free = free; + this.items = new AtomicReferenceArray<>(size - 1); + } + + /** + * Produces an instance. + * + *

Search strategy is a simple linear probing which is chosen for it cache-friendliness. Note that + * {@link #free(Object)} will try to store recycled objects close to the start thus statistically reducing how far + * we will typically search.

+ * + * @return A (possibly) cached instance of type {@code T}. + */ + final T allocate() { + // PERF: Examine the first element. If that fails, allocateSlow will look at the remaining elements. + T inst = firstItem.getAndSet(null); + if (inst == null) { + inst = allocateSlow(); + } + + if (allocate != null) { + allocate.accept(inst); + } + + return inst; + } + + /** + * Returns objects to the pool. + * + *

Search strategy is a simple linear probing which is chosen for it cache-friendliness. Note that + * {@link #free(Object)} will try to store recycled objects close to the start thus statistically reducing how far + * we will typically search in Allocate.

+ * + * @param obj The object to free. + */ + final void free(T obj) { + if (free != null) { + free.accept(obj); + } + + if (firstItem == null) { + // Intentionally not using compareAndSet here. + // In a worst case scenario two objects may be stored into same slot. + // It is very unlikely to happen and will only mean that one of the objects will get collected. + firstItem.lazySet(obj); + } else { + freeSlow(obj); + } + } + + private T createInstance() { + T inst = factory.get(); + return inst; + } + + private T allocateSlow() { + for (int i = 0; i < items.length(); i++) { + // Note that the initial read is optimistically not synchronized. That is intentional. + // We will interlock only when we have a candidate. in a worst case we may miss some + // recently returned objects. Not a big deal. + T inst = items.get(i); + if (inst != null) { + inst = items.getAndSet(i, null); + if (inst != null) { + return inst; + } + } + } + + return createInstance(); + } + + private void freeSlow(T obj) { + for (int i = 0; i < items.length(); i++) { + if (items.get(i) == null) { + // Intentionally not using compareAndSet here. + // In a worst case scenario two objects may be stored into same slot. + // It is very unlikely to happen and will only mean that one of the objects will get collected. + items.lazySet(i, obj); + break; + } + } + } +} diff --git a/src/com/tunnelvisionlabs/util/concurrent/SharedPools.java b/src/com/tunnelvisionlabs/util/concurrent/SharedPools.java new file mode 100644 index 0000000..4189028 --- /dev/null +++ b/src/com/tunnelvisionlabs/util/concurrent/SharedPools.java @@ -0,0 +1,77 @@ +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +package com.tunnelvisionlabs.util.concurrent; + +import com.tunnelvisionlabs.util.validation.NotNull; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * General shared object pools. + * + *

+ * Use this shared pool if only concern is reducing object allocations. If perf of an object pool itself is also a + * concern, use {@link ObjectPool} directly.

+ * + *

+ * For example, if you want to create a million of small objects within a second, use the {@link ObjectPool} directly. + * it should have much less overhead than using this.

+ */ +enum SharedPools { + ; + + private static final ObjectPool> STRONG_BOX_POOL = new ObjectPool<>( + () -> new StrongBox<>(), + null, + box -> box.value = null); + private static final ConcurrentHashMap, ObjectPool> BIG_POOLS = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap, ObjectPool> NORMAL_POOLS = new ConcurrentHashMap<>(); + + /** + * Pool that uses default constructor with 100 elements pooled. + * + * @param The type of the object pool. + * @param clazz The {@link Class} describing the type of the object pool. + * @return A default big object pool. + */ + public static ObjectPool bigDefault(@NotNull Class clazz) { + return getOrCreate(BIG_POOLS, clazz, 100); + } + + /** + * Pool that uses default constructor with 20 elements pooled. + * + * @param The type of the object pool. + * @param clazz The {@link Class} describing the type of the object pool. + * @return A default object pool. + */ + public static ObjectPool normal(@NotNull Class clazz) { + return getOrCreate(NORMAL_POOLS, clazz, 20); + } + + public static ObjectPool> strongBox() { + @SuppressWarnings(Suppressions.UNCHECKED_SAFE) + ObjectPool> strongBoxPool = (ObjectPool>)(ObjectPool)STRONG_BOX_POOL; + return strongBoxPool; + } + + @NotNull + private static ObjectPool getOrCreate(@NotNull ConcurrentHashMap, ObjectPool> cache, @NotNull Class clazz, int size) { + ObjectPool result = cache.get(clazz); + if (result == null) { + Supplier factory = () -> { + try { + return clazz.newInstance(); + } catch (InstantiationException | IllegalAccessException ex) { + throw new UnsupportedOperationException(ex); + } + }; + + cache.putIfAbsent(clazz, new ObjectPool<>(factory, size)); + result = cache.get(clazz); + } + + @SuppressWarnings(Suppressions.UNCHECKED_SAFE) + ObjectPool typedResult = (ObjectPool)result; + return typedResult; + } +}