Skip to content

Object pool #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/com/tunnelvisionlabs/util/concurrent/Async.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ public static <T> Awaitable<T> configureAwait(@NotNull CompletableFuture<? exten

@NotNull
public static <T> CompletableFuture<T> runAsync(@NotNull Supplier<? extends CompletableFuture<T>> supplier) {
StrongBox<CompletableFuture<T>> result = SharedPools.<CompletableFuture<T>>strongBox().allocate();
try {
StrongBox<CompletableFuture<T>> result = new StrongBox<>();
ExecutionContext.run(ExecutionContext.capture(), s -> result.value = s.get(), supplier);
return result.value;
} catch (Throwable ex) {
return Futures.fromException(ex);
} finally {
SharedPools.<CompletableFuture<T>>strongBox().free(result);
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/com/tunnelvisionlabs/util/concurrent/AsyncLazy.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public final CompletableFuture<? extends T> getValueAsync(@NotNull CancellationT
Verify.failOperation("ValueFactoryReentrancy");
}

final StrongBox<InlineResumable> resumableAwaiter = new StrongBox<>();
InlineResumable resumableAwaiter = null;
synchronized (syncObject) {
// Note that if multiple threads hit getValueAsync() before
// the valueFactory has completed its synchronous execution,
Expand All @@ -125,11 +125,12 @@ public final CompletableFuture<? extends T> getValueAsync(@NotNull CancellationT
return Futures.completedCancelled();
}

resumableAwaiter.value = new InlineResumable();
resumableAwaiter = new InlineResumable();
final InlineResumable finalResumableAwaiter = resumableAwaiter;
Supplier<? extends CompletableFuture<? extends T>> originalValueFactory = this.valueFactory.getAndSet(null);
Supplier<? extends CompletableFuture<? extends T>> localValueFactory = () -> Async.runAsync(
() -> Async.awaitAsync(
resumableAwaiter.value,
finalResumableAwaiter,
() -> Async.awaitAsync(Async.configureAwait(originalValueFactory.get(), false))));

this.recursiveFactoryCheck.setValue(RECURSIVE_CHECK_SENTINEL);
Expand All @@ -154,8 +155,8 @@ public final CompletableFuture<? extends T> getValueAsync(@NotNull CancellationT
}

// Allow the original value factory to actually run.
if (resumableAwaiter.value != null) {
resumableAwaiter.value.resume();
if (resumableAwaiter != null) {
resumableAwaiter.resume();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ public AsyncManualResetEvent(boolean initialState, boolean allowInliningAwaiters
future.complete(null);
}

StrongBox<CompletableFuture<Void>> secondaryHandlerBox = new StrongBox<>();
StrongBox<CompletableFuture<Void>> secondaryHandlerBox = SharedPools.<CompletableFuture<Void>>strongBox().allocate();
this.priorityFuture = Futures.createPriorityHandler(this.future, secondaryHandlerBox);
this.secondaryFuture = secondaryHandlerBox.value;
SharedPools.<CompletableFuture<Void>>strongBox().free(secondaryHandlerBox);
}

/**
Expand Down Expand Up @@ -131,9 +132,10 @@ public final void reset() {
if (this.isSet) {
this.future = this.createFuture();

StrongBox<CompletableFuture<Void>> secondaryHandlerBox = new StrongBox<>();
StrongBox<CompletableFuture<Void>> secondaryHandlerBox = SharedPools.<CompletableFuture<Void>>strongBox().allocate();
this.priorityFuture = Futures.createPriorityHandler(this.future, secondaryHandlerBox);
this.secondaryFuture = secondaryHandlerBox.value;
SharedPools.<CompletableFuture<Void>>strongBox().free(secondaryHandlerBox);

this.isSet = false;
}
Expand Down Expand Up @@ -162,9 +164,10 @@ final CompletableFuture<Void> pulseAllAsync() {

this.future = this.createFuture();

StrongBox<CompletableFuture<Void>> secondaryHandlerBox = new StrongBox<>();
StrongBox<CompletableFuture<Void>> secondaryHandlerBox = SharedPools.<CompletableFuture<Void>>strongBox().allocate();
this.priorityFuture = Futures.createPriorityHandler(this.future, secondaryHandlerBox);
this.secondaryFuture = secondaryHandlerBox.value;
SharedPools.<CompletableFuture<Void>>strongBox().free(secondaryHandlerBox);

this.isSet = false;
}
Expand Down
12 changes: 8 additions & 4 deletions src/com/tunnelvisionlabs/util/concurrent/ExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,23 @@ public static Runnable wrap(@NotNull Runnable runnable) {
public static <T, U> Function<T, U> wrap(@NotNull Function<T, U> function) {
ExecutionContext executionContext = capture();
return t -> {
StrongBox<U> result = new StrongBox<>();
StrongBox<U> result = SharedPools.<U>strongBox().allocate();
run(executionContext, state -> result.value = function.apply(t), null);
return result.value;
U value = result.value;
SharedPools.<U>strongBox().free(result);
return value;
};
}

@NotNull
public static <T> Supplier<T> wrap(@NotNull Supplier<T> supplier) {
ExecutionContext executionContext = capture();
return () -> {
StrongBox<T> result = new StrongBox<>();
StrongBox<T> result = SharedPools.<T>strongBox().allocate();
run(executionContext, state -> result.value = supplier.get(), null);
return result.value;
T value = result.value;
SharedPools.<T>strongBox().free(result);
return value;
};
}

Expand Down
19 changes: 14 additions & 5 deletions src/com/tunnelvisionlabs/util/concurrent/JoinableFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -703,10 +703,12 @@ final T completeOnCurrentThread() {
// Don't use IsCompleted as the condition because that
// includes queues of posted work that don't have to complete for the
// JoinableTask to be ready to return from the JTF.Run method.
StrongBox<Set<JoinableFuture<?>>> visited = new StrongBox<>();
StrongBox<Set<JoinableFuture<?>>> visited = SharedPools.<Set<JoinableFuture<?>>>strongBox().allocate();
StrongBox<SingleExecuteProtector<?>> work = SharedPools.<SingleExecuteProtector<?>>strongBox().allocate();
StrongBox<CompletableFuture<?>> tryAgainAfter = SharedPools.<CompletableFuture<?>>strongBox().allocate();
while (!isCompleteRequested()) {
StrongBox<SingleExecuteProtector<?>> work = new StrongBox<>();
StrongBox<CompletableFuture<?>> tryAgainAfter = new StrongBox<>();
work.value = null;
tryAgainAfter.value = null;
if (tryPollSelfOrDependencies(onMainThread, visited, work, tryAgainAfter)) {
work.value.tryExecute();
} else if (tryAgainAfter.value != null) {
Expand All @@ -716,6 +718,10 @@ final T completeOnCurrentThread() {
assert tryAgainAfter.value.isDone();
}
}

SharedPools.<Set<JoinableFuture<?>>>strongBox().free(visited);
SharedPools.<SingleExecuteProtector<?>>strongBox().free(work);
SharedPools.<CompletableFuture<?>>strongBox().free(tryAgainAfter);
} finally {
try (SpecializedSyncContext syncContext = SpecializedSyncContext.apply(getFactory().getContext().getNoMessagePumpSynchronizationContext())) {
synchronized (owner.getContext().getSyncContextLock()) {
Expand Down Expand Up @@ -1212,7 +1218,8 @@ private static void removeDependingSynchronousFutureFrom(List<? extends Joinable
Requires.notNull(syncTask, "synchronousFuture");

Set<JoinableFuture<?>> reachableTasks = null;
final StrongBox<Set<JoinableFuture<?>>> remainTasks = new StrongBox<>();
final StrongBox<Set<JoinableFuture<?>>> remainTasks = SharedPools.<Set<JoinableFuture<?>>>strongBox().allocate();
final StrongBox<Set<JoinableFuture<?>>> remainPlaceHold = SharedPools.<Set<JoinableFuture<?>>>strongBox().allocate();

if (force) {
reachableTasks = new HashSet<>();
Expand All @@ -1232,11 +1239,13 @@ private static void removeDependingSynchronousFutureFrom(List<? extends Joinable
syncTask.computeSelfAndDescendentOrJoinedJobsAndRemainFutures(reachableTasks, remainTasks.value);

// force to remove all invalid items
final StrongBox<Set<JoinableFuture<?>>> remainPlaceHold = new StrongBox<>();
for (JoinableFuture<?> remainTask : remainTasks.value) {
remainTask.removeDependingSynchronousFuture(syncTask, reachableTasks, remainPlaceHold);
}
}

SharedPools.<Set<JoinableFuture<?>>>strongBox().free(remainTasks);
SharedPools.<Set<JoinableFuture<?>>>strongBox().free(remainPlaceHold);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,8 @@ public HangReportContribution getHangReport() {
try (SpecializedSyncContext syncContext = SpecializedSyncContext.apply(getNoMessagePumpSynchronizationContext())) {
synchronized (getSyncContextLock()) {
try {
StrongBox<Element> nodes = new StrongBox<>();
StrongBox<Element> links = new StrongBox<>();
StrongBox<Element> nodes = SharedPools.<Element>strongBox().allocate();
StrongBox<Element> links = SharedPools.<Element>strongBox().allocate();
Document dgml = createTemplateDgml(nodes, links);

Map<JoinableFuture<?>, Element> pendingTasksElements = createNodesForPendingFutures(dgml);
Expand Down Expand Up @@ -621,6 +621,12 @@ public HangReportContribution getHangReport() {
links.value.appendChild(pair.getValue());
}

SharedPools.<Element>strongBox().free(nodes);
nodes = null;

SharedPools.<Element>strongBox().free(links);
links = null;

TransformerFactory transformerFactory = TransformerFactory.newInstance();
Transformer transformer = transformerFactory.newTransformer();
DOMSource source = new DOMSource(dgml);
Expand Down
150 changes: 150 additions & 0 deletions src/com/tunnelvisionlabs/util/concurrent/ObjectPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.tunnelvisionlabs.util.concurrent;

import com.tunnelvisionlabs.util.validation.NotNull;
import com.tunnelvisionlabs.util.validation.Nullable;
import com.tunnelvisionlabs.util.validation.Requires;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Generic implementation of object pooling pattern with predefined pool size limit. The main purpose is that limited
* number of frequently used objects can be kept in the pool for further recycling.
*
* <p>Notes:</p>
*
* <ol>
* <li>it is not the goal to keep all returned objects. Pool is not meant for storage. If there is no space in the pool,
* extra returned objects will be dropped.</li>
* <li>it is implied that if object was obtained from a pool, the caller will return it back in a relatively short time.
* Keeping checked out objects for long durations is OK, but reduces usefulness of pooling. Just new up your own.</li>
* </ol>
*
* <p>Not returning objects to the pool in not detrimental to the pool's work, but is a bad practice. Rationale: If
* there is no intent for reusing the object, do not use pool - just use "new".</p>
*
* @param <T> The type of the objects in this cache.
*/
class ObjectPool<T> {
private final AtomicReferenceArray<T> items;

/**
* {@code factory} is stored for the lifetime of the pool. We will call this only when pool needs to expand.
* Compared to {@link Class#newInstance()}, {@link Supplier} gives more flexibility to implementers and is faster
* than {@link Class#newInstance()}.
*/
private final Supplier<? extends T> factory;

private final Consumer<? super T> allocate;

private final Consumer<? super T> free;

/**
* Storage for the pool objects. The first item is stored in a dedicated field because we expect to be able to
* satisfy most requests from it.
*/
private final AtomicReference<T> firstItem = new AtomicReference<>();

ObjectPool(@NotNull Supplier<? extends T> factory) {
this(factory, null, null, Runtime.getRuntime().availableProcessors() * 2);
}

ObjectPool(@NotNull Supplier<? extends T> factory, int size) {
this(factory, null, null, size);
}

ObjectPool(@NotNull Supplier<? extends T> factory, @Nullable Consumer<? super T> allocate, @Nullable Consumer<? super T> free) {
this(factory, allocate, free, Runtime.getRuntime().availableProcessors() * 2);
}

ObjectPool(@NotNull Supplier<? extends T> factory, @Nullable Consumer<? super T> allocate, @Nullable Consumer<? super T> free, int size) {
Requires.argument(size >= 1, "size", "The object pool can't be empty");
this.factory = factory;
this.allocate = allocate;
this.free = free;
this.items = new AtomicReferenceArray<>(size - 1);
}

/**
* Produces an instance.
*
* <p>Search strategy is a simple linear probing which is chosen for it cache-friendliness. Note that
* {@link #free(Object)} will try to store recycled objects close to the start thus statistically reducing how far
* we will typically search.</p>
*
* @return A (possibly) cached instance of type {@code T}.
*/
final T allocate() {
// PERF: Examine the first element. If that fails, allocateSlow will look at the remaining elements.
T inst = firstItem.getAndSet(null);
if (inst == null) {
inst = allocateSlow();
}

if (allocate != null) {
allocate.accept(inst);
}

return inst;
}

/**
* Returns objects to the pool.
*
* <p>Search strategy is a simple linear probing which is chosen for it cache-friendliness. Note that
* {@link #free(Object)} will try to store recycled objects close to the start thus statistically reducing how far
* we will typically search in Allocate.</p>
*
* @param obj The object to free.
*/
final void free(T obj) {
if (free != null) {
free.accept(obj);
}

if (firstItem == null) {
// Intentionally not using compareAndSet here.
// In a worst case scenario two objects may be stored into same slot.
// It is very unlikely to happen and will only mean that one of the objects will get collected.
firstItem.lazySet(obj);
} else {
freeSlow(obj);
}
}

private T createInstance() {
T inst = factory.get();
return inst;
}

private T allocateSlow() {
for (int i = 0; i < items.length(); i++) {
// Note that the initial read is optimistically not synchronized. That is intentional.
// We will interlock only when we have a candidate. in a worst case we may miss some
// recently returned objects. Not a big deal.
T inst = items.get(i);
if (inst != null) {
inst = items.getAndSet(i, null);
if (inst != null) {
return inst;
}
}
}

return createInstance();
}

private void freeSlow(T obj) {
for (int i = 0; i < items.length(); i++) {
if (items.get(i) == null) {
// Intentionally not using compareAndSet here.
// In a worst case scenario two objects may be stored into same slot.
// It is very unlikely to happen and will only mean that one of the objects will get collected.
items.lazySet(i, obj);
break;
}
}
}
}
Loading