Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class AllocationManager {
private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
private final int size;
private final UnsafeDirectLittleEndian underlying;
private final IdentityHashMap<BufferAllocator, BufferLedger> map = new IdentityHashMap<>();
private final AllocatorLedgerMapWrap map = new AllocatorLedgerMapWrap();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
Expand Down Expand Up @@ -140,7 +140,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
return existingLedger;
}

final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
final BufferLedger ledger = new BufferLedger(allocator);
if (retain) {
ledger.inc();
}
Expand All @@ -151,54 +151,44 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
}
}


/**
* The way that a particular BufferLedger communicates back to the AllocationManager that it
* now longer needs to hold
* a reference to particular piece of memory.
* Can only be called when you already hold the writeLock.
*/
private class ReleaseListener {

private final BufferAllocator allocator;

public ReleaseListener(BufferAllocator allocator) {
this.allocator = allocator;
}

/**
* Can only be called when you already hold the writeLock.
*/
public void release() {
allocator.assertOpen();
private void release(final BufferLedger ledger) {
final BaseAllocator allocator = ledger.getAllocator();
allocator.assertOpen();

final BufferLedger oldLedger = map.remove(allocator);
oldLedger.allocator.dissociateLedger(oldLedger);
final BufferLedger oldLedger = map.remove(allocator);
oldLedger.allocator.dissociateLedger(oldLedger);

if (oldLedger == owningLedger) {
if (map.isEmpty()) {
// no one else owns, lets release.
oldLedger.allocator.releaseBytes(size);
underlying.release();
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
// we need to change the owning allocator. we've been removed so we'll get whatever is
// top of list
BufferLedger newLedger = map.values().iterator().next();

// we'll forcefully transfer the ownership and not worry about whether we exceeded the
// limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
if (oldLedger == owningLedger) {
if (map.isEmpty()) {
// no one else owns, lets release.
oldLedger.allocator.releaseBytes(size);
underlying.release();
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
if (map.isEmpty()) {
throw new IllegalStateException("The final removal of a ledger should be connected to " +
"the owning ledger.");
}
// we need to change the owning allocator. we've been removed so we'll get whatever is
// top of list
// AllocatorLedgerMapWrap doe snot support map.values().iterator().next(); in some cases
// use custom method: getNextValue()
//BufferLedger newLedger = map.values().iterator().next();
BufferLedger newLedger = map.getNextValue();

// we'll forcefully transfer the ownership and not worry about whether we exceeded the
// limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
} else {
if (map.isEmpty()) {
throw new IllegalStateException("The final removal of a ledger should be connected to " +
"the owning ledger.");
}


}
}

Expand All @@ -220,17 +210,23 @@ public class BufferLedger {
// manage request for retain
// correctly
private final long lCreationTime = System.nanoTime();
private final BaseAllocator allocator;
private final ReleaseListener listener;
final BaseAllocator allocator;
private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog
(BaseAllocator.DEBUG_LOG_LENGTH,
"BufferLedger[%d]", 1)
: null;
private volatile long lDestructionTime = 0;

private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
private BufferLedger(BaseAllocator allocator) {
this.allocator = allocator;
this.listener = listener;
}

/**
* Get the allocator for this ledger
* @return allocator
*/
private BaseAllocator getAllocator() {
return allocator;
}

/**
Expand Down Expand Up @@ -339,7 +335,7 @@ public int decrement(int decrement) {
outcome = bufRefCnt.addAndGet(-decrement);
if (outcome == 0) {
lDestructionTime = System.nanoTime();
listener.release();
release(this);
}
}

Expand Down Expand Up @@ -463,4 +459,4 @@ boolean isOwningLedger() {

}

}
}
Loading