diff --git a/src/main/java/org/apache/datasketches/filters/quotientfilter/QuotientFilter.java b/src/main/java/org/apache/datasketches/filters/quotientfilter/QuotientFilter.java index bd8c2abd9..a47bb5f6a 100644 --- a/src/main/java/org/apache/datasketches/filters/quotientfilter/QuotientFilter.java +++ b/src/main/java/org/apache/datasketches/filters/quotientfilter/QuotientFilter.java @@ -21,46 +21,34 @@ import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedList; +import java.util.Queue; import java.util.Set; +import org.apache.datasketches.common.SketchesException; import org.apache.datasketches.filters.common.BitArray; import org.apache.datasketches.filters.common.HeapBitArray; public class QuotientFilter extends Filter { - int bitPerEntry_; - int fingerprintLength_; + public static final double LOAD_FACTOR = 0.9; + + int numBitsPerEntry_; int powerOfTwoSize_; int numEntries_; + int numExpansions_; BitArray bitArray_; - double expansionThreshold_; - long maxEntriesBeforeExpansion_; - boolean expandAutonomously_; - boolean isFull_; - // statistics, computed in the compute_statistics method. method should be called before these are used long numRuns_; long numClusters_; public double avgRunLength_; public double avgClusterLength_; - int originalFingerprintSize_; - int numExpansions_; - - public QuotientFilter(final int powerOfTwo, final int bitsPerEntry) { + public QuotientFilter(final int powerOfTwo, final int numBitsPerEntry) { powerOfTwoSize_ = powerOfTwo; - bitPerEntry_ = bitsPerEntry; - fingerprintLength_ = bitsPerEntry - 3; - final long init_size = 1L << powerOfTwo; - bitArray_ = makeFilter(init_size, bitsPerEntry); - - expansionThreshold_ = 0.8; - maxEntriesBeforeExpansion_ = (int) (init_size * expansionThreshold_); - expandAutonomously_ = false; - isFull_ = false; - - originalFingerprintSize_ = fingerprintLength_; + numBitsPerEntry_ = numBitsPerEntry; + bitArray_ = makeFilter(getNumSlots(), numBitsPerEntry); numExpansions_ = 0; //hash_type = XxHash.hashLong ; //HashType.xxh; } @@ -73,16 +61,12 @@ public long getNumEntries() { return numEntries_; } - public long getMaxEntriesBeforeExpansion() { - return maxEntriesBeforeExpansion_; - } - - public boolean expandAutonomously() { - return expandAutonomously_; + public int getNumExpansions() { + return numExpansions_; } - public void setExpandAutonomously(final boolean val) { - expandAutonomously_ = val; + public long getMaxEntriesBeforeExpansion() { + return (long)(getNumSlots() * LOAD_FACTOR); } BitArray makeFilter(final long initSize, final int bitsPerEntry) { @@ -90,19 +74,40 @@ BitArray makeFilter(final long initSize, final int bitsPerEntry) { } public int getFingerprintLength() { - return fingerprintLength_; + return numBitsPerEntry_ - 3; } - QuotientFilter(final int powerOfTwo, final int bitsPerEntry, final BitArray bitArray) { + QuotientFilter(final int powerOfTwo, final int numBitsPerEntry, final BitArray bitArray) { powerOfTwoSize_ = powerOfTwo; - bitPerEntry_ = bitsPerEntry; - fingerprintLength_ = bitsPerEntry - 3; + numBitsPerEntry_ = numBitsPerEntry; bitArray_ = bitArray; } - boolean expand() { - isFull_ = true; - return false; + void expand() { + if (getFingerprintLength() < 2) throw new SketchesException("for expansion value must have at least 2 bits"); + QuotientFilter other = new QuotientFilter(powerOfTwoSize_ + 1, numBitsPerEntry_ - 1); + + long i = 0; + if (!isSlotEmpty(i)) { i = findClusterStart(i); } + + Queue fifo = new LinkedList(); + long count = 0; + while (count < numEntries_) { + if (!isSlotEmpty(i)) { + if (isOccupied(i)) { fifo.add(i); } + final long fingerprint = getFingerprint(i); + final long newQuotient = (fifo.element() << 1) | (fingerprint >> other.getFingerprintLength()); + final long newFingerprint = fingerprint & other.getFingerprintMask(); + other.insert(newFingerprint, newQuotient); + count++; + } + i = (i + 1) & getSlotMask(); + if (!fifo.isEmpty() && ! isContinuation(i)) { fifo.remove(); } + } + powerOfTwoSize_++; + numBitsPerEntry_--; + bitArray_ = other.bitArray_; + numExpansions_++; } // measures the number of bits per entry for the filter @@ -122,11 +127,9 @@ protected static double measureNumBitsPerEntry(final QuotientFilter current, fin //System.out.println(); numEntries += q.getNumEntries(); } - long init_size = 1L << current.powerOfTwoSize_; - long numBits = current.bitPerEntry_ * init_size; - for (QuotientFilter q : otherFilters) { - init_size = 1L << q.powerOfTwoSize_; - numBits += q.bitPerEntry_ * init_size; + long numBits = current.getNumBitsPerEntry() * current.getNumSlots(); + for (final QuotientFilter q : otherFilters) { + numBits += q.getNumBitsPerEntry() * q.getNumSlots(); } //System.out.println("total entries: \t\t" + num_entries); //System.out.println("total bits: \t\t" + num_bits); @@ -146,10 +149,14 @@ public long getNumSlots() { return 1L << powerOfTwoSize_; } - long getMask() { + long getSlotMask() { return getNumSlots() - 1; } + long getFingerprintMask() { + return (1L << getFingerprintLength()) - 1; + } + // sets the metadata flag bits for a given slot index void modifySlot(final boolean isOccupied, final boolean isContinuation, final boolean isShifted, final long index) { setOccupied(index, isOccupied); @@ -159,18 +166,18 @@ void modifySlot(final boolean isOccupied, final boolean isContinuation, final bo // sets the fingerprint for a given slot index void setFingerprint(final long index, final long fingerprint) { - bitArray_.setBits(index * bitPerEntry_ + 3, fingerprintLength_, fingerprint); + bitArray_.setBits(index * numBitsPerEntry_ + 3, getFingerprintLength(), fingerprint); } // print a nice representation of the filter that can be understood. // if vertical is on, each line will represent a slot - public String get_pretty_str(final boolean vertical) { + public String getPrettyStr(final boolean vertical) { final StringBuffer sbr = new StringBuffer(); - final long numBits = getNumSlots() * bitPerEntry_; + final long numBits = getNumSlots() * numBitsPerEntry_; for (long i = 0; i < numBits; i++) { - final long remainder = i % bitPerEntry_; + final long remainder = i % numBitsPerEntry_; if (remainder == 0) { - final long slot = i / bitPerEntry_; + final long slot = i / numBitsPerEntry_; sbr.append(" "); if (vertical) { sbr.append("\n" + String.format("%-10d", slot) + "\t"); @@ -187,17 +194,17 @@ public String get_pretty_str(final boolean vertical) { // print a representation of the filter that can be humanly read. public void prettyPrint() { - System.out.print(get_pretty_str(true)); + System.out.print(getPrettyStr(true)); } // return a fingerprint in a given slot index long getFingerprint(final long index) { - return bitArray_.getBits(index * bitPerEntry_ + 3, fingerprintLength_); + return bitArray_.getBits(index * numBitsPerEntry_ + 3, getFingerprintLength()); } // return an entire slot representation, including metadata flags and fingerprint long getSlot(final long index) { - return bitArray_.getBits(index * bitPerEntry_, bitPerEntry_); + return bitArray_.getBits(index * numBitsPerEntry_, numBitsPerEntry_); } // compare a fingerprint input to the fingerprint in some slot index @@ -215,15 +222,14 @@ void modifySlot(final boolean isOccupied, final boolean isContinuation, final bo // summarize some statistical measures about the filter public void printFilterSummary() { final long slots = getNumSlots(); - final long num_bits = slots * bitPerEntry_; - System.out.println("slots:\t" + slots); - System.out.println("entries:\t" + numEntries_); - System.out.println("bits\t:" + num_bits); - System.out.println("bits/entry\t:" + num_bits / (double)numEntries_); - System.out.println("FP length:\t" + fingerprintLength_); - System.out.println("Is full?\t" + isFull_); - final double capacity = numEntries_ / (double)(slots) ; - System.out.println("Capacity\t" + capacity); + final long numBits = slots * numBitsPerEntry_; + System.out.println("slots: " + slots); + System.out.println("bits: " + numBits); + System.out.println("bits/entry: " + numBits / (double)numEntries_); + System.out.println("FP length: " + getFingerprintLength()); + System.out.println("entries: " + numEntries_); + System.out.println("expansions: " + numExpansions_); + System.out.println("load: " + numEntries_ / (double)(slots)); computeStatistics(); //System.out.println("num runs: \t\t" + num_runs); //System.out.println("avg run length: \t" + avg_run_length); @@ -236,35 +242,35 @@ public void printFilterSummary() { */ @Override public long getSpaceUse() { - return getNumSlots() * bitPerEntry_; + return getNumSlots() * numBitsPerEntry_; } - public int getBitsPerEntry() { - return bitPerEntry_; + public int getNumBitsPerEntry() { + return numBitsPerEntry_; } boolean isOccupied(final long index) { - return bitArray_.getBit(index * bitPerEntry_); + return bitArray_.getBit(index * numBitsPerEntry_); } boolean isContinuation(final long index) { - return bitArray_.getBit(index * bitPerEntry_ + 1); + return bitArray_.getBit(index * numBitsPerEntry_ + 1); } boolean isShifted(final long index) { - return bitArray_.getBit(index * bitPerEntry_ + 2); + return bitArray_.getBit(index * numBitsPerEntry_ + 2); } void setOccupied(final long index, final boolean val) { - bitArray_.assignBit(index * bitPerEntry_, val); + bitArray_.assignBit(index * numBitsPerEntry_, val); } void setContinuation(final long index, final boolean val) { - bitArray_.assignBit(index * bitPerEntry_ + 1, val); + bitArray_.assignBit(index * numBitsPerEntry_ + 1, val); } void setShifted(final long index, final boolean val) { - bitArray_.assignBit(index * bitPerEntry_ + 2, val); + bitArray_.assignBit(index * numBitsPerEntry_ + 2, val); } boolean isSlotEmpty(final long index) { @@ -275,7 +281,7 @@ boolean isSlotEmpty(final long index) { // used by deletes long findClusterStart(long index) { while (isShifted(index)) { - index = (index - 1) & getMask(); + index = (index - 1) & getSlotMask(); } return index; } @@ -285,13 +291,13 @@ long findClusterStart(long index) { long findRunStart(long index) { int numRunsToSkip = 0; while (isShifted(index)) { - index = (index - 1) & getMask(); + index = (index - 1) & getSlotMask(); if (isOccupied(index)) { numRunsToSkip++; } } while (numRunsToSkip > 0) { - index = (index + 1) & getMask(); + index = (index + 1) & getSlotMask(); if (!isContinuation(index)) { numRunsToSkip--; } @@ -302,7 +308,7 @@ long findRunStart(long index) { // given the start of a run, scan the run and return the index of the first matching fingerprint // if not found returns the insertion position as bitwise complement to make it negative long findFirstFingerprintInRun(long index, final long fingerprint) { - assert(!isContinuation(index)); + assert !isContinuation(index); do { final long fingerprintAtIndex = getFingerprint(index); if (fingerprintAtIndex == fingerprint) { @@ -310,37 +316,35 @@ long findFirstFingerprintInRun(long index, final long fingerprint) { } else if (fingerprintAtIndex > fingerprint) { return ~index; } - index = (index + 1) & getMask(); + index = (index + 1) & getSlotMask(); } while (isContinuation(index)); return ~index; } // delete the last matching fingerprint in the run long decideWhichFingerprintToDelete(long index, final long fingerprint) { - assert(!isContinuation(index)); + assert !isContinuation(index); long matchingFingerprintIndex = -1; do { if (compare(index, fingerprint)) { - //System.out.println("found matching FP at index " + index); matchingFingerprintIndex = index; } - index = (index + 1) & getMask(); + index = (index + 1) & getSlotMask(); } while (isContinuation(index)); return matchingFingerprintIndex; } // given the start of a run, find the last slot index that still belongs to this run long findRunEnd(long index) { - while (isContinuation((index + 1) & getMask())) { - index = (index + 1) & getMask(); + while (isContinuation((index + 1) & getSlotMask())) { + index = (index + 1) & getSlotMask(); } return index; } // given a canonical index slot and a fingerprint, find the relevant run and check if there is a matching fingerprint within it - boolean search(long fingerprint, long index) { - final boolean doesRunExist = isOccupied(index); - if (!doesRunExist) { + boolean search(final long fingerprint, final long index) { + if (!isOccupied(index)) { return false; } final long runStartIndex = findRunStart(index); @@ -359,7 +363,7 @@ Set getAllFingerprints(final long bucketIndex) { long runIndex = findRunStart(bucketIndex); do { set.add(getFingerprint(runIndex)); - runIndex = (runIndex + 1) & getMask(); + runIndex = (runIndex + 1) & getSlotMask(); } while (isContinuation(runIndex)); return set; } @@ -368,18 +372,20 @@ boolean insert(final long fingerprint, final long index) { if (index >= getNumSlots() || numEntries_ == getNumSlots()) { return false; } - final long run_start = findRunStart(index); + final long runStart = findRunStart(index); if (!isOccupied(index)) { - return insertFingerprintAndPushAllElse(fingerprint, run_start, index, true, true); + insertFingerprintAndPushAllElse(fingerprint, runStart, index, true, true); + return true; } - final long found_index = findFirstFingerprintInRun(run_start, fingerprint); - if (found_index >= 0) { + final long foundIndex = findFirstFingerprintInRun(runStart, fingerprint); + if (foundIndex >= 0) { return false; } - return insertFingerprintAndPushAllElse(fingerprint, ~found_index, index, false, ~found_index == run_start); + insertFingerprintAndPushAllElse(fingerprint, ~foundIndex, index, false, ~foundIndex == runStart); + return true; } - boolean insertFingerprintAndPushAllElse(long fingerprint, long index, final long canonical, + void insertFingerprintAndPushAllElse(long fingerprint, long index, final long canonical, final boolean isNewRun, final boolean isRunStart) { // in the first shifted entry set isContinuation flag if inserting at the start of the existing run // otherwise just shift the existing flag as it is @@ -406,7 +412,7 @@ boolean insertFingerprintAndPushAllElse(long fingerprint, long index, final long isContinuation = existingIsContinuation | forceContinuation; isShifted = true; - index = (index + 1) & getMask(); + index = (index + 1) & getSlotMask(); // remember the existing entry to be shifted existingFingerprint = getFingerprint(index); @@ -424,7 +430,6 @@ boolean insertFingerprintAndPushAllElse(long fingerprint, long index, final long setOccupied(canonical, true); } numEntries_++; - return true; } boolean delete(final long fingerprint, final long canonicalSlot, long runStartIndex, long matchingFingerprintIndex) { @@ -435,8 +440,8 @@ boolean delete(final long fingerprint, final long canonicalSlot, long runStartIn boolean turnOffOccupied = runStartIndex == runEnd; // First thing to do is move everything else in the run back by one slot - for (long i = matchingFingerprintIndex; i != runEnd; i = (i + 1) & getMask()) { - long f = getFingerprint((i + 1) & getMask()); + for (long i = matchingFingerprintIndex; i != runEnd; i = (i + 1) & getSlotMask()) { + long f = getFingerprint((i + 1) & getSlotMask()); setFingerprint(i, f); } @@ -447,7 +452,7 @@ boolean delete(final long fingerprint, final long canonicalSlot, long runStartIn long clusterStart = findClusterStart(canonicalSlot); long numShiftedCount = 0; long numNonOccupied = 0; - for (long i = clusterStart; i != ((runEnd + 1) & getMask()); i = (i + 1) & getMask()) { + for (long i = clusterStart; i != ((runEnd + 1) & getSlotMask()); i = (i + 1) & getSlotMask()) { if (isContinuation(i)) { numShiftedCount++; } @@ -468,7 +473,7 @@ boolean delete(final long fingerprint, final long canonicalSlot, long runStartIn //boolean does_next_run_exist = !is_slot_empty(run_end + 1); //boolean is_next_run_shifted = is_shifted(run_end + 1); //if (!does_next_run_exist || !is_next_run_shifted) { - if (isSlotEmpty((runEnd + 1) & getMask()) || !isShifted((runEnd + 1) & getMask())) { + if (isSlotEmpty((runEnd + 1) & getSlotMask()) || !isShifted((runEnd + 1) & getSlotMask())) { if (turnOffOccupied) { // if we eliminated a run and now need to turn the isOccupied flag off, we do it at the end to not interfere in our counts setOccupied(canonicalSlot, false); @@ -477,22 +482,22 @@ boolean delete(final long fingerprint, final long canonicalSlot, long runStartIn } // we now find the start and end of the next run - final long nextRunStart = (runEnd + 1) & getMask(); + final long nextRunStart = (runEnd + 1) & getSlotMask(); runEnd = findRunEnd(nextRunStart); // before we start processing the next run, we check whether the previous run we shifted is now back to its canonical slot // The condition num_shifted_count - num_non_occupied == 1 ensures that the run was shifted by only 1 slot, meaning it is now back in its proper place - if (isOccupied((nextRunStart - 1) & getMask()) && numShiftedCount - numNonOccupied == 1) { - setShifted((nextRunStart - 1) & getMask(), false); + if (isOccupied((nextRunStart - 1) & getSlotMask()) && numShiftedCount - numNonOccupied == 1) { + setShifted((nextRunStart - 1) & getSlotMask(), false); } else { - setShifted((nextRunStart - 1) & getMask(), true); + setShifted((nextRunStart - 1) & getSlotMask(), true); } - for (long i = nextRunStart; i != ((runEnd + 1) & getMask()); i = (i + 1) & getMask()) { + for (long i = nextRunStart; i != ((runEnd + 1) & getSlotMask()); i = (i + 1) & getSlotMask()) { long f = getFingerprint(i); - setFingerprint((i - 1) & getMask(), f); + setFingerprint((i - 1) & getSlotMask(), f); if (isContinuation(i)) { - setContinuation((i - 1) & getMask(), true); + setContinuation((i - 1) & getSlotMask(), true); } if (!isOccupied(i)) { numNonOccupied++; @@ -522,22 +527,12 @@ boolean delete(final long fingerprint, final long canonicalSlot) { return delete(fingerprint, canonicalSlot, runStartIndex, matchingFingerprintIndex); } - /* - * Performs the modular arithmetic of large_hash % bits_per_entry and uses this as the slot_index - */ - long getSlotIndex(final long largeHash) { - return largeHash & getMask(); - } - - long genFingerprint(final long largeHash) { - long fingerprintMask = (1L << fingerprintLength_) - 1L; - fingerprintMask = fingerprintMask << powerOfTwoSize_; - return (largeHash & fingerprintMask) >> powerOfTwoSize_; + long getSlotFromHash(final long largeHash) { + return (largeHash >> getFingerprintLength()) & getSlotMask(); } - void setExpansionThreshold(double thresh) { - expansionThreshold_ = thresh; - maxEntriesBeforeExpansion_ = (long)(Math.pow(2, powerOfTwoSize_) * expansionThreshold_); + long getFingerprintFromHash(final long largeHash) { + return largeHash & getFingerprintMask(); } /* @@ -547,26 +542,20 @@ void setExpansionThreshold(double thresh) { Hence, the `large_hash` argument is already a hash key that has been generated by the hashing library (eg xxhash). */ - protected boolean _insert(long large_hash) { - if (isFull_) { - return false; - } - final long slotIndex = getSlotIndex(large_hash); - final long fingerprint = genFingerprint(large_hash); - boolean success = insert(fingerprint, slotIndex); - - if (expandAutonomously_ && numEntries_ >= maxEntriesBeforeExpansion_) { - final boolean expanded = expand(); - if (expanded) { - numExpansions_++; - } + protected boolean _insert(final long largeHash) { + final long slotIndex = getSlotFromHash(largeHash); + final long fingerprint = getFingerprintFromHash(largeHash); + final boolean success = insert(fingerprint, slotIndex); + + if (numEntries_ == getMaxEntriesBeforeExpansion()) { + expand(); } return success; } protected boolean _delete(final long largeHash) { - final long slotIndex = getSlotIndex(largeHash); - long fingerprint = genFingerprint(largeHash); + final long slotIndex = getSlotFromHash(largeHash); + long fingerprint = getFingerprintFromHash(largeHash); boolean success = delete(fingerprint, slotIndex); if (success) { numEntries_--; @@ -575,8 +564,8 @@ protected boolean _delete(final long largeHash) { } protected boolean _search(final long largeHash) { - final long slotIndex = getSlotIndex(largeHash); - long fingerprint = genFingerprint(largeHash); + final long slotIndex = getSlotFromHash(largeHash); + final long fingerprint = getFingerprintFromHash(largeHash); return search(fingerprint, slotIndex); } diff --git a/src/test/java/org/apache/datasketches/filters/quotientfilter/QuotientFilterTest.java b/src/test/java/org/apache/datasketches/filters/quotientfilter/QuotientFilterTest.java index 60db5be84..3e9644dfe 100644 --- a/src/test/java/org/apache/datasketches/filters/quotientfilter/QuotientFilterTest.java +++ b/src/test/java/org/apache/datasketches/filters/quotientfilter/QuotientFilterTest.java @@ -22,7 +22,6 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertEquals; -import java.util.ArrayList; import java.util.BitSet; import java.util.HashSet; import java.util.Random; @@ -261,7 +260,7 @@ static public BitSet set_slot_in_test(BitSet result, int bits_per_entry, int slo static public boolean check_equality(QuotientFilter qf, BitSet bs, boolean check_also_fingerprints) { for (int i = 0; i < bs.size(); i++) { - if (check_also_fingerprints || (i % qf.getBitsPerEntry() == 0 || i % qf.getBitsPerEntry() == 1 || i % qf.getBitsPerEntry() == 2)) { + if (check_also_fingerprints || (i % qf.getNumBitsPerEntry() == 0 || i % qf.getNumBitsPerEntry() == 1 || i % qf.getNumBitsPerEntry() == 2)) { if (qf.getBitAtOffset(i) != bs.get(i)) { return false; } @@ -297,4 +296,45 @@ static public boolean test_no_false_negatives(QuotientFilter filter, int num_ent } return true; } + + @Test + public void smallExpansion() { + final QuotientFilter qf = new QuotientFilter(5, 12); + final int n = 30; + for (int i = 0; i < n; i++) { qf.insert(i); } + qf.printFilterSummary(); + assertEquals(qf.getNumExpansions(), 1); + assertEquals(qf.getNumEntries(), n); + + // query the same keys + int positives = 0; + for (int i = 0; i < n; i++) { if (qf.search(i)) { positives++; } } + assertEquals(positives, n); + + // query novel keys + positives = 0; + for (int i = 0; i < n; i++) { if (qf.search(i + n)) { positives++; } } + assertTrue(positives < 2); + } + + @Test + public void expansion() { + final QuotientFilter qf = new QuotientFilter(16, 16); + final int n = 60000; + for (int i = 0; i < n; i++) { qf.insert(i); } +// qf.printFilterSummary(); + assertEquals(qf.getNumExpansions(), 1); + assertTrue(qf.getNumEntries() > n * 0.99); // allow a few hash collisions + + // query the same keys + int positives = 0; + for (int i = 0; i < n; i++) { if (qf.search(i)) { positives++; } } + assertEquals(positives, n); + + // query novel keys + positives = 0; + for (int i = 0; i < n; i++) { if (qf.search(i + n)) { positives++; } } + assertTrue(positives < 6); + } + }