Skip to content

implemented merge #582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@
import java.util.Queue;
import java.util.Set;

import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.common.SketchesException;
import org.apache.datasketches.filters.common.BitArray;
import org.apache.datasketches.filters.common.HeapBitArray;

public class QuotientFilter extends Filter {

public static final float DEFAULT_LOAD_FACTOR = 0.8f;
public static final double DEFAULT_LOAD_FACTOR = 0.8;

int lgQ_;
int numFingerprintBits_;
float loadFactor_;
double loadFactor_;
int numEntries_;
int numExpansions_;
BitArray bitArray_;
Expand All @@ -50,7 +51,7 @@ public QuotientFilter(final int lgQ, final int numFingerprintBits) {
this(lgQ, numFingerprintBits, DEFAULT_LOAD_FACTOR);
}

public QuotientFilter(final int lgQ, final int numFingerprintBits, final float loadFactor) {
public QuotientFilter(final int lgQ, final int numFingerprintBits, final double loadFactor) {
lgQ_ = lgQ;
numFingerprintBits_ = numFingerprintBits;
loadFactor_ = loadFactor;
Expand Down Expand Up @@ -83,12 +84,6 @@ public int getFingerprintLength() {
return numFingerprintBits_;
}

// QuotientFilter(final int powerOfTwo, final int numBitsPerEntry, final BitArray bitArray) {
// powerOfTwoSize_ = powerOfTwo;
// numBitsPerEntry_ = numBitsPerEntry;
// bitArray_ = bitArray;
// }

void expand() {
if (getFingerprintLength() < 2) throw new SketchesException("for expansion value must have at least 2 bits");
final QuotientFilter other = new QuotientFilter(lgQ_ + 1, numFingerprintBits_ - 1, loadFactor_);
Expand Down Expand Up @@ -150,6 +145,14 @@ public double getUtilization() {
return numEntries_ / (double) getNumSlots();
}

public int getLgQ() {
return lgQ_;
}

public double getLoadFactor() {
return loadFactor_;
}

// returns the number of slots in the filter without the extension/buffer slots
public long getNumSlots() {
return 1L << lgQ_;
Expand Down Expand Up @@ -220,7 +223,7 @@ protected boolean compare(final long index, final long fingerprint) {

// modify the flags and fingerprint of a given slot
void modifySlot(final boolean isOccupied, final boolean isContinuation, final boolean isShifted,
final long index, final long fingerprint) {
final long index, final long fingerprint) {
modifySlot(isOccupied, isContinuation, isShifted, index);
setFingerprint(index, fingerprint);
}
Expand All @@ -229,13 +232,14 @@ void modifySlot(final boolean isOccupied, final boolean isContinuation, final bo
public void printFilterSummary() {
final long slots = getNumSlots();
final long numBits = slots * getNumBitsPerEntry();
System.out.println("slots: " + slots);
System.out.println("bits: " + numBits);
System.out.println("bits/entry: " + numBits / (double)numEntries_);
System.out.println("FP length: " + getFingerprintLength());
System.out.println("entries: " + numEntries_);
System.out.println("expansions: " + numExpansions_);
System.out.println("load: " + numEntries_ / (double)(slots));
System.out.println("lgQ: " + lgQ_);
System.out.println("FP length: " + getFingerprintLength());
System.out.println("load factor: " + getLoadFactor());
System.out.println("bits: " + numBits);
System.out.println("bits/entry: " + numBits / (double)numEntries_);
System.out.println("entries: " + numEntries_);
System.out.println("expansions: " + numExpansions_);
System.out.println("load: " + numEntries_ / (double)(slots));
computeStatistics();
//System.out.println("num runs: \t\t" + num_runs);
//System.out.println("avg run length: \t" + avg_run_length);
Expand Down Expand Up @@ -320,7 +324,7 @@ long findFirstFingerprintInRun(long index, final long fingerprint) {
if (fingerprintAtIndex == fingerprint) {
return index;
} else if (fingerprintAtIndex > fingerprint) {
return ~index;
return ~index;
}
index = (index + 1) & getSlotMask();
} while (isContinuation(index));
Expand Down Expand Up @@ -392,7 +396,7 @@ boolean insert(final long fingerprint, final long index) {
}

void insertFingerprintAndPushAllElse(long fingerprint, long index, final long canonical,
final boolean isNewRun, final boolean isRunStart) {
final boolean isNewRun, final boolean isRunStart) {
// in the first shifted entry set isContinuation flag if inserting at the start of the existing run
// otherwise just shift the existing flag as it is
boolean forceContinuation = !isNewRun && isRunStart;
Expand Down Expand Up @@ -632,4 +636,27 @@ public void computeStatistics() {
avgClusterLength_ = sumClusterLengths / numClusters_;
}

}
public void merge(final QuotientFilter other) {
if (lgQ_ + numFingerprintBits_ != other.lgQ_ + other.numFingerprintBits_) {
throw new SketchesArgumentException("incompatible sketches in merge");
}
long i = 0;
if (!other.isSlotEmpty(i)) { i = other.findClusterStart(i); }

final Queue<Long> fifo = new LinkedList<Long>();
long count = 0;
while (count < other.numEntries_) {
if (!other.isSlotEmpty(i)) {
if (other.isOccupied(i)) { fifo.add(i); }
final long quotient = fifo.element();
final long fingerprint = other.getFingerprint(i);
final long hash = quotient << other.getFingerprintLength() | fingerprint;
System.out.println("q=" + quotient + ", fp=" + fingerprint + ", hash=" + hash);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this print statement be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

_insert(hash);
count++;
}
i = (i + 1) & other.getSlotMask();
if (!fifo.isEmpty() && ! other.isContinuation(i)) { fifo.remove(); }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
*/

package org.apache.datasketches.filters.quotientfilter;
//import java.util.concurrent.ThreadLocalRandom;

import static org.apache.datasketches.filters.quotientfilter.QuotientFilter.DEFAULT_LOAD_FACTOR;
import org.apache.datasketches.common.SketchesArgumentException;

/**
Expand Down Expand Up @@ -60,11 +59,11 @@ public static byte suggestFingerprintLength(double targetFalsePositiveProb) {
* @param maxDistinctItems The maximum number of distinct items that can be inserted into the filter.
* @return The log-base-2 of the number of slots in the filter.
*/
public static byte suggestLgNumSlots(long maxDistinctItems) {
public static byte suggestLgNumSlots(long maxDistinctItems, double loadFactor) {
if (maxDistinctItems <= 0) {
throw new SketchesArgumentException("maxDistinctItems must be strictly positive");
}
byte result = (byte) Math.ceil(Math.log(maxDistinctItems / 0.9) / Math.log(2));
byte result = (byte) Math.ceil(Math.log(maxDistinctItems / loadFactor) / Math.log(2));
if (result < 31) {
return result;
} else {
Expand All @@ -73,19 +72,27 @@ public static byte suggestLgNumSlots(long maxDistinctItems) {
}
}

public static byte suggestLgNumSlots(long maxDistinctItems) {
return suggestLgNumSlots(maxDistinctItems, DEFAULT_LOAD_FACTOR);
}

/*
Returns the largest number of unique items that can be inserted into the filter.
We use a predefined load factor of 0.9 compared to the number of slots as 2^j.
@param lgNumSlots The log-base-2 of the number of slots in the filter
@return The maximum number of items that can be inserted into the filter
*/
public static long suggestMaxNumItemsFromNumSlots(byte lgNumSlots) {
public static long suggestMaxNumItemsFromNumSlots(int lgNumSlots, double loadFactor) {
if (lgNumSlots <= 0) {
throw new SketchesArgumentException("lgNumSlots must be at least 1.");
} else if (lgNumSlots >= 31) {
throw new SketchesArgumentException("lgNumSlots cannot exceed 2^31 - 1.");
}
return (long) Math.floor(0.9 * Math.pow(2, lgNumSlots));
return (long) (loadFactor * (1L<<lgNumSlots));
}

public static long suggestMaxNumItemsFromNumSlots(byte lgNumSlots) {
return suggestMaxNumItemsFromNumSlots(lgNumSlots, DEFAULT_LOAD_FACTOR);
}


Expand All @@ -95,21 +102,29 @@ public static long suggestMaxNumItemsFromNumSlots(byte lgNumSlots) {
* The results are returned as a QFPair object.
*
* @param maxDistinctItems The maximum number of distinct items that can be inserted into the filter.
* @param loadFactor The load factor to use when calculating the number of slots.
* @param targetFalsePositiveProb The desired false positive probability per item.
* @return A QFPair object containing the suggested number of slots (lgNumSlots) and the suggested fingerprint length.
* @throws SketchesArgumentException if the input parameters are not valid.
*/
public static QFPair suggestParamsFromMaxDistinctsFPP(long maxDistinctItems, double targetFalsePositiveProb) {
validateAccuracyInputs(maxDistinctItems, targetFalsePositiveProb);
byte lgNumSlots = suggestLgNumSlots(maxDistinctItems);
public static QFPair suggestParamsFromMaxDistinctsFPP(long maxDistinctItems, double loadFactor, double targetFalsePositiveProb) {
validateAccuracyInputs(maxDistinctItems, loadFactor, targetFalsePositiveProb);
byte lgNumSlots = suggestLgNumSlots(maxDistinctItems, loadFactor);
byte fingerprintLength = suggestFingerprintLength(targetFalsePositiveProb);
return new QFPair(lgNumSlots, fingerprintLength);
}

private static void validateAccuracyInputs(final long maxDistinctItems, final double targetFalsePositiveProb) {
public static QFPair suggestParamsFromMaxDistinctsFPP(long maxDistinctItems, double targetFalsePositiveProb) {
return suggestParamsFromMaxDistinctsFPP(maxDistinctItems, DEFAULT_LOAD_FACTOR, targetFalsePositiveProb);
}

private static void validateAccuracyInputs(final long maxDistinctItems, final double loadFactor, final double targetFalsePositiveProb) {
if (maxDistinctItems <= 0) {
throw new SketchesArgumentException("maxDistinctItems must be strictly positive");
}
if (loadFactor <=0.0 || loadFactor >= 1.0) {
throw new SketchesArgumentException("loadFactor must be larger than 0 and less than 1");
}
if (targetFalsePositiveProb <= 0.0 || targetFalsePositiveProb > 1.0) {
throw new SketchesArgumentException("targetFalsePositiveProb must be a valid probability and strictly greater than 0");
}
Expand All @@ -130,4 +145,4 @@ public QFPair(byte lgNumSlots, byte fingerprintLength) {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public static void testSuggestLgNumSlots(){
QuotientFilterBuilder qfb = new QuotientFilterBuilder();

// invalid number of items
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(0,0.9));
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(-1, 0.9));
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(5000000000L, 0.9));
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(0));
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(-1));
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(5000000000L));
Expand All @@ -55,7 +58,9 @@ public static void testSuggestLgNumSlots(){

for (int i = 0; i < numItems.length; i++) {
long num = numItems[i];
byte result = qfb.suggestLgNumSlots(num);
byte result = qfb.suggestLgNumSlots(num, 0.9);
assertEquals(result, results[i]);
result = qfb.suggestLgNumSlots(num);
assertEquals(result, results[i]);
}
}
Expand All @@ -70,12 +75,15 @@ public static void testSuggestMaxNumItems(){
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestMaxNumItemsFromNumSlots((byte)32));


byte[] lgNumSlots = {1, 2, 3, 6, 10, 15, 25, 30,};
long[] results = {1, 3, 7, 57, 921, 29491, 30198988, 966367641} ;
int[] lgNumSlots = {1, 2, 3, 6, 10, 15, 25, 30,};
long[] results_ninety_pc = {1, 3, 7, 57, 921, 29491, 30198988, 966367641} ;
long[] results_eighty_pc = {1, 3, 6, 51, 819, 26214, 26843545, 858993459} ;

for (int i = 0; i < lgNumSlots.length; i++) {
long result = qfb.suggestMaxNumItemsFromNumSlots(lgNumSlots[i]);
assertEquals(result, results[i]);
long result_ninety = qfb.suggestMaxNumItemsFromNumSlots(lgNumSlots[i], 0.9);
long result_eighty = qfb.suggestMaxNumItemsFromNumSlots(lgNumSlots[i], 0.8);
assertEquals(result_ninety, results_ninety_pc[i]);
assertEquals(result_eighty, results_eighty_pc[i]);
}
}

Expand All @@ -96,18 +104,26 @@ public static void testSuggestParamsFromMaxDistinctsFPP(){
double[] fpp = {1E-10, 1E-2, 1e-7} ;

// expected outcomes
byte[] expected_lgNumSlots = {1, 10, 30} ;
byte[] expected_lgNumSlotsNinety = {1, 10, 30} ;
byte[] expected_lgNumSlotsEighty = {1, 11, 30} ;
byte[] expected_fingerprintLength = {34, 7, 24} ;

for (int i = 0; i < numItems.length; i++) {
QuotientFilterBuilder.QFPair pair = qfb.suggestParamsFromMaxDistinctsFPP(numItems[i], fpp[i]);
QuotientFilterBuilder.QFPair pair = qfb.suggestParamsFromMaxDistinctsFPP(numItems[i], 0.9, fpp[i]);
lgNumSlots = pair.lgNumSlots;
fingerprintLength = pair.fingerprintLength;
assertEquals(expected_lgNumSlotsNinety[i], lgNumSlots);
assertEquals(expected_fingerprintLength[i], fingerprintLength);

// 80% load
pair = qfb.suggestParamsFromMaxDistinctsFPP(numItems[i], fpp[i]);
lgNumSlots = pair.lgNumSlots;
fingerprintLength = pair.fingerprintLength;
assertEquals(expected_lgNumSlots[i], lgNumSlots);
assertEquals(expected_lgNumSlotsEighty[i], lgNumSlots);
assertEquals(expected_fingerprintLength[i], fingerprintLength);
}
}



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

package org.apache.datasketches.filters.quotientfilter;
import org.apache.datasketches.common.SketchesArgumentException;
import org.testng.annotations.Test;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -308,4 +309,57 @@ public void expansion() {
assertTrue(positives < 6);
}

@Test
public void mergeEmpty() {
final QuotientFilter qf1 = new QuotientFilter(4, 3);
final QuotientFilter qf2 = new QuotientFilter(4, 3);
qf1.merge(qf2);

assertEquals(qf1.getLgQ(), 4);
assertEquals(qf1.getFingerprintLength(), 3);
assertEquals(qf1.getNumEntries(), 0);
}

@Test
public void merge() {
final QuotientFilter qf1 = new QuotientFilter(16, 13);
final QuotientFilter qf2 = new QuotientFilter(16, 13);
final int n = 50000;
for (int i = 0; i < n / 2; i++) {
qf1.insert(i);
qf1.insert(i + n / 2);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be qf2.insert(i + n/2)?
Otherwise there are not items in qf2 so we merge empty sketches as in the above test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is a mistake

}
qf1.merge(qf2);

assertEquals(qf1.getNumExpansions(), 0);
assertTrue(qf1.getNumEntries() > n * 0.99); // allow a few hash collisions

// query the same keys
int positives = 0;
for (int i = 0; i < n; i++) { if (qf1.search(i)) { positives++; } }
assertEquals(positives, n);

// query novel keys
positives = 0;
for (int i = 0; i < n; i++) { if (qf1.search(i + n)) { positives++; } }
assertTrue(positives < 4);
}

@Test
public void mergeDifferentConfiguration() {
final QuotientFilter qf1 = new QuotientFilter(3, 4);
final QuotientFilter qf2 = new QuotientFilter(4, 3);
qf1.insert(4);
qf2.insert(4);
qf1.merge(qf2);
assertEquals(qf1.getNumEntries(), 1);
}

@Test(expectedExceptions = SketchesArgumentException.class)
public void mergeIncompatible() {
final QuotientFilter qf1 = new QuotientFilter(4, 4);
final QuotientFilter qf2 = new QuotientFilter(4, 3);
qf1.merge(qf2);
}

}
Loading