diff --git a/src/main/java/org/apache/datasketches/theta/AnotBimpl.java b/src/main/java/org/apache/datasketches/theta/AnotBimpl.java index e7b2c99eb..cc076fd85 100644 --- a/src/main/java/org/apache/datasketches/theta/AnotBimpl.java +++ b/src/main/java/org/apache/datasketches/theta/AnotBimpl.java @@ -20,8 +20,11 @@ package org.apache.datasketches.theta; import static org.apache.datasketches.common.Util.exactLog2OfLong; -import static org.apache.datasketches.thetacommon.HashOperations.convertToHashTable; +import static org.apache.datasketches.thetacommon.HashOperations.checkThetaCorruption; +import static org.apache.datasketches.thetacommon.HashOperations.continueCondition; import static org.apache.datasketches.thetacommon.HashOperations.hashSearch; +import static org.apache.datasketches.thetacommon.HashOperations.hashSearchOrInsert; +import static org.apache.datasketches.thetacommon.HashOperations.minLgHashTableSize; import java.util.Arrays; @@ -124,7 +127,7 @@ public CompactSketch aNotB(final Sketch skA, final Sketch skB, final boolean dst if (skB.isEmpty()) { return skA.compact(dstOrdered, dstMem); - } + } ThetaUtil.checkSeedHashes(skB.getSeedHash(), seedHash_); //Both skA & skB are not empty @@ -162,14 +165,12 @@ private static long[] getResultHashArr( //returns a new array final long[] hashArrA, final Sketch skB) { - //Rebuild/get hashtable of skB + // Rebuild or get hashtable of skB final long[] hashTableB; //read only - final long[] thetaCache = skB.getCache(); - final int countB = skB.getRetainedEntries(true); if (skB instanceof CompactSketch) { - hashTableB = convertToHashTable(thetaCache, countB, minThetaLong, ThetaUtil.REBUILD_THRESHOLD); + hashTableB = convertToHashTable(skB, minThetaLong, ThetaUtil.REBUILD_THRESHOLD); } else { - hashTableB = thetaCache; + hashTableB = skB.getCache(); } //build temporary result arrays of skA @@ -191,6 +192,25 @@ private static long[] getResultHashArr( //returns a new array return Arrays.copyOfRange(tmpHashArrA, 0, nonMatches); } + private static long[] convertToHashTable( + final Sketch sketch, + final long thetaLong, + final double rebuildThreshold) { + final int lgArrLongs = minLgHashTableSize(sketch.getRetainedEntries(true), rebuildThreshold); + final int arrLongs = 1 << lgArrLongs; + final long[] hashTable = new long[arrLongs]; + checkThetaCorruption(thetaLong); + final HashIterator it = sketch.iterator(); + while (it.next()) { + final long hash = it.get(); + if (continueCondition(thetaLong, hash) ) { + continue; + } + hashSearchOrInsert(hashTable, lgArrLongs, hash); + } + return hashTable; + } + private void reset() { thetaLong_ = Long.MAX_VALUE; empty_ = true; diff --git a/src/main/java/org/apache/datasketches/theta/CompactOperations.java b/src/main/java/org/apache/datasketches/theta/CompactOperations.java index a8066314d..2b52f59fa 100644 --- a/src/main/java/org/apache/datasketches/theta/CompactOperations.java +++ b/src/main/java/org/apache/datasketches/theta/CompactOperations.java @@ -161,7 +161,7 @@ static CompactSketch memoryToCompact( final long hash = srcMem.getLong(srcPreLongs << 3); final SingleItemSketch sis = new SingleItemSketch(hash, srcSeedHash); if (dstMem != null) { - dstMem.putByteArray(0, sis.toByteArray(),0, 16); + dstMem.putByteArray(0, sis.toByteArray(), 0, 16); return new DirectCompactSketch(dstMem); } else { //heap return sis; diff --git a/src/main/java/org/apache/datasketches/theta/CompactSketch.java b/src/main/java/org/apache/datasketches/theta/CompactSketch.java index 1426368f1..688ad2746 100644 --- a/src/main/java/org/apache/datasketches/theta/CompactSketch.java +++ b/src/main/java/org/apache/datasketches/theta/CompactSketch.java @@ -32,6 +32,7 @@ import static org.apache.datasketches.theta.PreambleUtil.extractEntryBitsV4; import static org.apache.datasketches.theta.PreambleUtil.extractNumEntriesBytesV4; import static org.apache.datasketches.theta.PreambleUtil.extractThetaLongV4; +import static org.apache.datasketches.theta.PreambleUtil.wholeBytesToHoldBits; import static org.apache.datasketches.theta.SingleItemSketch.otherCheckForSingleItem; import org.apache.datasketches.common.Family; @@ -189,7 +190,8 @@ private static CompactSketch wrap(final Memory srcMem, final long seed, final bo if (serVer == 4) { // not wrapping the compressed format since currently we cannot take advantage of // decompression during iteration because set operations reach into memory directly - return heapifyV4(srcMem, seed, enforceSeed); + return DirectCompactCompressedSketch.wrapInstance(srcMem, + enforceSeed ? seedHash : (short) extractSeedHash(srcMem)); } else if (serVer == 3) { if (PreambleUtil.isEmptyFlag(srcMem)) { @@ -274,10 +276,6 @@ private int computeMinLeadingZeros() { return Long.numberOfLeadingZeros(ored); } - private static int wholeBytesToHoldBits(final int bits) { - return (bits >>> 3) + ((bits & 7) > 0 ? 1 : 0); - } - private byte[] toByteArrayV4() { final int preambleLongs = isEstimationMode() ? 2 : 1; final int entryBits = 64 - computeMinLeadingZeros(); @@ -286,8 +284,8 @@ private byte[] toByteArrayV4() { // store num_entries as whole bytes since whole-byte blocks will follow (most probably) final int numEntriesBytes = wholeBytesToHoldBits(32 - Integer.numberOfLeadingZeros(getRetainedEntries())); - final int size = preambleLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(compressedBits); - final byte[] bytes = new byte[size]; + final int sizeBytes = preambleLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(compressedBits); + final byte[] bytes = new byte[sizeBytes]; final WritableMemory mem = WritableMemory.writableWrap(bytes); int offsetBytes = 0; mem.putByte(offsetBytes++, (byte) preambleLongs); @@ -334,12 +332,10 @@ private byte[] toByteArrayV4() { private static CompactSketch heapifyV4(final Memory srcMem, final long seed, final boolean enforceSeed) { final int preLongs = extractPreLongs(srcMem); - final int flags = extractFlags(srcMem); final int entryBits = extractEntryBitsV4(srcMem); final int numEntriesBytes = extractNumEntriesBytesV4(srcMem); final short seedHash = (short) extractSeedHash(srcMem); - final boolean isEmpty = (flags & EMPTY_FLAG_MASK) > 0; - if (enforceSeed && !isEmpty) { PreambleUtil.checkMemorySeedHash(srcMem, seed); } + if (enforceSeed) { PreambleUtil.checkMemorySeedHash(srcMem, seed); } int offsetBytes = 8; long theta = Long.MAX_VALUE; if (preLongs > 1) { @@ -374,7 +370,7 @@ private static CompactSketch heapifyV4(final Memory srcMem, final long seed, fin entries[i] += previous; previous = entries[i]; } - return new HeapCompactSketch(entries, isEmpty, seedHash, numEntries, theta, true); + return new HeapCompactSketch(entries, false, seedHash, numEntries, theta, true); } } diff --git a/src/main/java/org/apache/datasketches/theta/DirectCompactCompressedSketch.java b/src/main/java/org/apache/datasketches/theta/DirectCompactCompressedSketch.java new file mode 100644 index 000000000..d7e05ca2e --- /dev/null +++ b/src/main/java/org/apache/datasketches/theta/DirectCompactCompressedSketch.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.theta; + +import static org.apache.datasketches.theta.PreambleUtil.extractEntryBitsV4; +import static org.apache.datasketches.theta.PreambleUtil.extractNumEntriesBytesV4; +import static org.apache.datasketches.theta.PreambleUtil.extractPreLongs; +import static org.apache.datasketches.theta.PreambleUtil.extractSeedHash; +import static org.apache.datasketches.theta.PreambleUtil.extractThetaLongV4; +import static org.apache.datasketches.theta.PreambleUtil.wholeBytesToHoldBits; + +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.datasketches.thetacommon.ThetaUtil; + +/** + * An off-heap (Direct), compact, compressed, read-only sketch. It is not empty, not a single item and ordered. + * + *

This sketch can only be associated with a Serialization Version 4 format binary image.

+ * + *

This implementation uses data in a given Memory that is owned and managed by the caller. + * This Memory can be off-heap, which if managed properly will greatly reduce the need for + * the JVM to perform garbage collection.

+ */ +class DirectCompactCompressedSketch extends DirectCompactSketch { + /** + * Construct this sketch with the given memory. + * @param mem Read-only Memory object. + */ + DirectCompactCompressedSketch(final Memory mem) { + super(mem); + } + + /** + * Wraps the given Memory, which must be a SerVer 4 compressed CompactSketch image. + * Must check the validity of the Memory before calling. + * @param srcMem See Memory + * @param seedHash The update seedHash. + * See Seed Hash. + * @return this sketch + */ + static DirectCompactCompressedSketch wrapInstance(final Memory srcMem, final short seedHash) { + ThetaUtil.checkSeedHashes((short) extractSeedHash(srcMem), seedHash); + return new DirectCompactCompressedSketch(srcMem); + } + + //Sketch Overrides + + @Override + public CompactSketch compact(final boolean dstOrdered, final WritableMemory dstMem) { + if (dstMem != null) { + mem_.copyTo(0, dstMem, 0, getCurrentBytes()); + return new DirectCompactSketch(dstMem); + } + return CompactSketch.heapify(mem_); + } + + @Override + public int getCurrentBytes() { + final int preLongs = extractPreLongs(mem_); + final int entryBits = extractEntryBitsV4(mem_); + final int numEntriesBytes = extractNumEntriesBytesV4(mem_); + return preLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(getRetainedEntries() * entryBits); + } + + private static final int START_PACKED_DATA_EXACT_MODE = 8; + private static final int START_PACKED_DATA_ESTIMATION_MODE = 16; + + @Override + public int getRetainedEntries(final boolean valid) { //compact is always valid + // number of entries is stored using variable length encoding + // most significant bytes with all zeros are not stored + // one byte in the preamble has the number of non-zero bytes used + final int preLongs = extractPreLongs(mem_); // if > 1 then the second long has theta + final int numEntriesBytes = extractNumEntriesBytesV4(mem_); + int offsetBytes = preLongs > 1 ? START_PACKED_DATA_ESTIMATION_MODE : START_PACKED_DATA_EXACT_MODE; + int numEntries = 0; + for (int i = 0; i < numEntriesBytes; i++) { + numEntries |= Byte.toUnsignedInt(mem_.getByte(offsetBytes++)) << (i << 3); + } + return numEntries; + } + + @Override + public long getThetaLong() { + final int preLongs = extractPreLongs(mem_); + return (preLongs > 1) ? extractThetaLongV4(mem_) : Long.MAX_VALUE; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public boolean isOrdered() { + return true; + } + + @Override + public HashIterator iterator() { + return new MemoryCompactCompressedHashIterator( + mem_, + (extractPreLongs(mem_) > 1 ? 16 : 8) + extractNumEntriesBytesV4(mem_), + extractEntryBitsV4(mem_), + getRetainedEntries() + ); + } + + //restricted methods + + @Override + long[] getCache() { + final int numEntries = getRetainedEntries(); + final long[] cache = new long[numEntries]; + int i = 0; + HashIterator it = iterator(); + while (it.next()) { + cache[i++] = it.get(); + } + return cache; + } +} diff --git a/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java b/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java index 0f69ec3c2..1714d2161 100644 --- a/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java +++ b/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java @@ -86,11 +86,7 @@ public int getCurrentBytes() { @Override public double getEstimate() { - if (otherCheckForSingleItem(mem_)) { return 1; } - final int preLongs = extractPreLongs(mem_); - final int curCount = (preLongs == 1) ? 0 : extractCurCount(mem_); - final long thetaLong = (preLongs > 2) ? extractThetaLong(mem_) : Long.MAX_VALUE; - return Sketch.estimate(thetaLong, curCount); + return Sketch.estimate(getThetaLong(), getRetainedEntries()); } @Override @@ -142,10 +138,8 @@ public HashIterator iterator() { @Override public byte[] toByteArray() { - final int curCount = getRetainedEntries(true); - checkIllegalCurCountAndEmpty(isEmpty(), curCount); - final int preLongs = extractPreLongs(mem_); - final int outBytes = (curCount + preLongs) << 3; + checkIllegalCurCountAndEmpty(isEmpty(), getRetainedEntries()); + final int outBytes = getCurrentBytes(); final byte[] byteArrOut = new byte[outBytes]; mem_.getByteArray(0, byteArrOut, 0, outBytes); return byteArrOut; diff --git a/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java b/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java index fc81d1124..b1be73c74 100644 --- a/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java +++ b/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java @@ -288,7 +288,7 @@ else if (curCount_ < 0 && sketchInEntries > 0) { else { //On the heap, allocate a HT hashTable_ = new long[1 << lgArrLongs_]; } - moveDataToTgt(sketchIn.getCache(), curCount_); + moveDataToTgt(sketchIn); } //end of state 5 //state 7 @@ -434,8 +434,6 @@ long getThetaLong() { private void performIntersect(final Sketch sketchIn) { // curCount and input data are nonzero, match against HT assert curCount_ > 0 && !empty_; - final long[] cacheIn = sketchIn.getCache(); - final int arrLongsIn = cacheIn.length; final long[] hashTable; if (wmem_ != null) { final int htLen = 1 << lgArrLongs_; @@ -448,27 +446,17 @@ private void performIntersect(final Sketch sketchIn) { final long[] matchSet = new long[ min(curCount_, sketchIn.getRetainedEntries(true)) ]; int matchSetCount = 0; - if (sketchIn.isOrdered()) { - //ordered compact, which enables early stop - for (int i = 0; i < arrLongsIn; i++ ) { - final long hashIn = cacheIn[i]; - //if (hashIn <= 0L) continue; //<= 0 should not happen - if (hashIn >= thetaLong_) { - break; //early stop assumes that hashes in input sketch are ordered! - } + final boolean isOrdered = sketchIn.isOrdered(); + final HashIterator it = sketchIn.iterator(); + while (it.next()) { + final long hashIn = it.get(); + if (hashIn < thetaLong_) { final int foundIdx = hashSearch(hashTable, lgArrLongs_, hashIn); - if (foundIdx == -1) { continue; } - matchSet[matchSetCount++] = hashIn; - } - } - else { - //either unordered compact or hash table - for (int i = 0; i < arrLongsIn; i++ ) { - final long hashIn = cacheIn[i]; - if (hashIn <= 0L || hashIn >= thetaLong_) { continue; } - final int foundIdx = hashSearch(hashTable, lgArrLongs_, hashIn); - if (foundIdx == -1) { continue; } - matchSet[matchSetCount++] = hashIn; + if (foundIdx != -1) { + matchSet[matchSetCount++] = hashIn; + } + } else { + if (isOrdered) { break; } // early stop } } //reduce effective array size to minimum @@ -515,6 +503,32 @@ private void moveDataToTgt(final long[] arr, final int count) { assert tmpCnt == count : "Intersection Count Check: got: " + tmpCnt + ", expected: " + count; } + private void moveDataToTgt(final Sketch sketch) { + int count = sketch.getRetainedEntries(); + int tmpCnt = 0; + if (wmem_ != null) { //Off Heap puts directly into mem + final int preBytes = CONST_PREAMBLE_LONGS << 3; + final int lgArrLongs = lgArrLongs_; + final long thetaLong = thetaLong_; + HashIterator it = sketch.iterator(); + while (it.next()) { + final long hash = it.get(); + if (continueCondition(thetaLong, hash)) { continue; } + hashInsertOnlyMemory(wmem_, lgArrLongs, hash, preBytes); + tmpCnt++; + } + } else { //On Heap. Assumes HT exists and is large enough + HashIterator it = sketch.iterator(); + while (it.next()) { + final long hash = it.get(); + if (continueCondition(thetaLong_, hash)) { continue; } + hashInsertOnly(hashTable_, lgArrLongs_, hash); + tmpCnt++; + } + } + assert tmpCnt == count : "Intersection Count Check: got: " + tmpCnt + ", expected: " + count; + } + private void hardReset() { resetCommon(); if (wmem_ != null) { diff --git a/src/main/java/org/apache/datasketches/theta/MemoryCompactCompressedHashIterator.java b/src/main/java/org/apache/datasketches/theta/MemoryCompactCompressedHashIterator.java new file mode 100644 index 000000000..b743302a5 --- /dev/null +++ b/src/main/java/org/apache/datasketches/theta/MemoryCompactCompressedHashIterator.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.theta; + +import static org.apache.datasketches.theta.PreambleUtil.wholeBytesToHoldBits; + +import org.apache.datasketches.memory.Memory; + +/** + * @author Lee Rhodes + */ +class MemoryCompactCompressedHashIterator implements HashIterator { + private Memory mem; + private int offset; + private int entryBits; + private int numEntries; + private int index; + private long previous; + private int offsetBits; + private long[] buffer; + private byte[] bytes; + private boolean isBlockMode; + private boolean isFirstUnpack1; + + MemoryCompactCompressedHashIterator( + final Memory mem, + final int offset, + final int entryBits, + final int numEntries + ) { + this.mem = mem; + this.offset = offset; + this.entryBits = entryBits; + this.numEntries = numEntries; + index = -1; + previous = 0; + offsetBits = 0; + buffer = new long[8]; + bytes = new byte[entryBits]; + isBlockMode = numEntries >= 8; + isFirstUnpack1 = true; + } + + @Override + public long get() { + return buffer[index & 7]; + } + + @Override + public boolean next() { + if (++index == numEntries) { return false; } + if (isBlockMode) { + if ((index & 7) == 0) { + if (numEntries - index >= 8) { + unpack8(); + } else { + isBlockMode = false; + unpack1(); + } + } + } else { + unpack1(); + } + return true; + } + + private void unpack1() { + if (isFirstUnpack1) { + mem.getByteArray(offset, bytes, 0, wholeBytesToHoldBits((numEntries - index) * entryBits)); + offset = 0; + isFirstUnpack1 = false; + } + final int i = index & 7; + BitPacking.unpackBits(buffer, i, entryBits, bytes, offset, offsetBits); + offset += (offsetBits + entryBits) >>> 3; + offsetBits = (offsetBits + entryBits) & 7; + buffer[i] += previous; + previous = buffer[i]; + } + + private void unpack8() { + mem.getByteArray(offset, bytes, 0, entryBits); + BitPacking.unpackBitsBlock8(buffer, 0, bytes, 0, entryBits); + offset += entryBits; + for (int i = 0; i < 8; i++) { + buffer[i] += previous; + previous = buffer[i]; + } + } +} diff --git a/src/main/java/org/apache/datasketches/theta/PreambleUtil.java b/src/main/java/org/apache/datasketches/theta/PreambleUtil.java index e1d9262e6..ec0bc1268 100644 --- a/src/main/java/org/apache/datasketches/theta/PreambleUtil.java +++ b/src/main/java/org/apache/datasketches/theta/PreambleUtil.java @@ -524,4 +524,7 @@ private static void throwNotBigEnough(final long cap, final int required) { + ", Required: " + required); } + static int wholeBytesToHoldBits(final int bits) { + return (bits >>> 3) + ((bits & 7) > 0 ? 1 : 0); + } } diff --git a/src/main/java/org/apache/datasketches/theta/Sketch.java b/src/main/java/org/apache/datasketches/theta/Sketch.java index 89618bc23..6583e2dbf 100644 --- a/src/main/java/org/apache/datasketches/theta/Sketch.java +++ b/src/main/java/org/apache/datasketches/theta/Sketch.java @@ -451,9 +451,8 @@ public String toString(final boolean sketchSummary, final boolean dataDetail, fi final boolean hexMode) { final StringBuilder sb = new StringBuilder(); - final long[] cache = getCache(); int nomLongs = 0; - int arrLongs = cache.length; + int arrLongs = 0; float p = 0; int rf = 0; final boolean updateSketch = this instanceof UpdateSketch; @@ -473,12 +472,10 @@ public String toString(final boolean sketchSummary, final boolean dataDetail, fi final int w = width > 0 ? width : 8; // default is 8 wide if (curCount > 0) { sb.append("### SKETCH DATA DETAIL"); - for (int i = 0, j = 0; i < arrLongs; i++ ) { - final long h; - h = cache[i]; - if (h <= 0 || h >= thetaLong) { - continue; - } + HashIterator it = iterator(); + int j = 0; + while (it.next()) { + final long h = it.get(); if (j % w == 0) { sb.append(LS).append(String.format(" %6d", j + 1)); } diff --git a/src/main/java/org/apache/datasketches/theta/UnionImpl.java b/src/main/java/org/apache/datasketches/theta/UnionImpl.java index bac05de74..d5ae6071a 100644 --- a/src/main/java/org/apache/datasketches/theta/UnionImpl.java +++ b/src/main/java/org/apache/datasketches/theta/UnionImpl.java @@ -22,7 +22,6 @@ import static java.lang.Math.min; import static org.apache.datasketches.theta.PreambleUtil.COMPACT_FLAG_MASK; import static org.apache.datasketches.theta.PreambleUtil.ORDERED_FLAG_MASK; -import static org.apache.datasketches.theta.PreambleUtil.PREAMBLE_LONGS_BYTE; import static org.apache.datasketches.theta.PreambleUtil.UNION_THETA_LONG; import static org.apache.datasketches.theta.PreambleUtil.clearEmpty; import static org.apache.datasketches.theta.PreambleUtil.extractCurCount; @@ -328,38 +327,14 @@ public void union(final Sketch sketchIn) { unionThetaLong_ = min(min(unionThetaLong_, sketchIn.getThetaLong()), gadget_.getThetaLong()); //Theta rule unionEmpty_ = false; - final int curCountIn = sketchIn.getRetainedEntries(true); - if (curCountIn > 0) { - if (sketchIn.isOrdered() && (sketchIn instanceof CompactSketch)) { //Use early stop - //Ordered, thus compact - if (sketchIn.hasMemory()) { - final Memory skMem = sketchIn.getMemory(); - final int preambleLongs = skMem.getByte(PREAMBLE_LONGS_BYTE) & 0X3F; - for (int i = 0; i < curCountIn; i++ ) { - final int offsetBytes = preambleLongs + i << 3; - final long hashIn = skMem.getLong(offsetBytes); - if (hashIn >= unionThetaLong_) { break; } // "early stop" - gadget_.hashUpdate(hashIn); //backdoor update, hash function is bypassed - } - } - else { //sketchIn is on the Java Heap or has array - final long[] cacheIn = sketchIn.getCache(); //not a copy! - for (int i = 0; i < curCountIn; i++ ) { - final long hashIn = cacheIn[i]; - if (hashIn >= unionThetaLong_) { break; } // "early stop" - gadget_.hashUpdate(hashIn); //backdoor update, hash function is bypassed - } - } - } //End ordered, compact - else { //either not-ordered compact or Hash Table form. A HT may have dirty values. - final long[] cacheIn = sketchIn.getCache(); //if off-heap this will be a copy - final int arrLongs = cacheIn.length; - for (int i = 0, c = 0; i < arrLongs && c < curCountIn; i++ ) { - final long hashIn = cacheIn[i]; - if (hashIn <= 0L || hashIn >= unionThetaLong_) { continue; } //rejects dirty values - gadget_.hashUpdate(hashIn); //backdoor update, hash function is bypassed - c++; //ensures against invalid state inside the incoming sketch - } + final boolean isOrdered = sketchIn.isOrdered(); + final HashIterator it = sketchIn.iterator(); + while (it.next()) { + final long hash = it.get(); + if (hash < unionThetaLong_ && hash < gadget_.getThetaLong()) { + gadget_.hashUpdate(hash); // backdoor update, hash function is bypassed + } else { + if (isOrdered) { break; } } } unionThetaLong_ = min(unionThetaLong_, gadget_.getThetaLong()); //Theta rule with gadget @@ -379,11 +354,8 @@ public void union(final Memory skMem) { final int fam = extractFamilyID(skMem); if (serVer == 4) { // compressed ordered compact - // performance can be improved by decompression while performing the union - // potentially only partial decompression might be needed ThetaUtil.checkSeedHashes(expectedSeedHash_, (short) extractSeedHash(skMem)); - final CompactSketch csk = CompactSketch.wrap(skMem); - union(csk); + union(CompactSketch.wrap(skMem)); return; } if (serVer == 3) { //The OpenSource sketches (Aug 4, 2015) starts with serVer = 3 @@ -396,16 +368,13 @@ public void union(final Memory skMem) { } if (serVer == 2) { //older Sketch, which is compact and ordered ThetaUtil.checkSeedHashes(expectedSeedHash_, (short)extractSeedHash(skMem)); - final CompactSketch csk = ForwardCompatibility.heapify2to3(skMem, expectedSeedHash_); - union(csk); + union(ForwardCompatibility.heapify2to3(skMem, expectedSeedHash_)); return; } if (serVer == 1) { //much older Sketch, which is compact and ordered, no seedHash - final CompactSketch csk = ForwardCompatibility.heapify1to3(skMem, expectedSeedHash_); - union(csk); + union(ForwardCompatibility.heapify1to3(skMem, expectedSeedHash_)); return; } - throw new SketchesArgumentException("SerVer is unknown: " + serVer); }