Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ class RocksDBKVEngineIterator implements AutoCloseable {
Snapshot snapshot,
byte[] startKey,
byte[] endKey) {
ReadOptions readOptions = new ReadOptions().setPinData(true);
this(db, cfHandle, snapshot, startKey, endKey, true);
}

RocksDBKVEngineIterator(RocksDB db,
ColumnFamilyHandle cfHandle,
Snapshot snapshot,
byte[] startKey,
byte[] endKey,
boolean fillCache) {
ReadOptions readOptions = new ReadOptions().setPinData(true).setFillCache(fillCache);
Slice lowerSlice = null;
if (startKey != null) {
lowerSlice = new Slice(startKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,16 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basekv.localengine.rocksdb;

import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
import static org.apache.bifromq.basekv.localengine.IKVEngine.DEFAULT_NS;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toMetaKey;
import static org.apache.bifromq.basekv.localengine.rocksdb.RocksDBKVSpace.deleteDir;
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;

import org.apache.bifromq.basekv.localengine.ISyncContext;
import org.apache.bifromq.basekv.localengine.KVEngineException;
import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
Expand All @@ -36,6 +33,11 @@
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.bifromq.basekv.localengine.IKVSpaceIterator;
import org.apache.bifromq.basekv.localengine.ISyncContext;
import org.apache.bifromq.basekv.localengine.KVEngineException;
import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
import org.apache.bifromq.basekv.proto.Boundary;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
Expand Down Expand Up @@ -138,6 +140,16 @@ public <T> T call(Supplier<T> supplier) {
};
}

@Override
protected IKVSpaceIterator doNewIterator() {
return new RocksDBKVSpaceIterator(db(), cfHandle(), null, Boundary.getDefaultInstance(), newRefresher(), false);
}

@Override
protected IKVSpaceIterator doNewIterator(Boundary subBoundary) {
return new RocksDBKVSpaceIterator(db(), cfHandle(), null, subBoundary, newRefresher(), false);
}

private record ClosableResources(
String id,
String cpId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@

package org.apache.bifromq.basekv.localengine.rocksdb;

import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.DATA_SECTION_END;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.DATA_SECTION_START;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.fromDataKey;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toDataKey;
import static org.apache.bifromq.basekv.utils.BoundaryUtil.endKeyBytes;
import static org.apache.bifromq.basekv.utils.BoundaryUtil.startKeyBytes;
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;

import com.google.protobuf.ByteString;
import java.lang.ref.Cleaner;
import org.apache.bifromq.basekv.localengine.IKVSpaceIterator;
import org.apache.bifromq.basekv.localengine.ISyncContext;
import org.apache.bifromq.basekv.proto.Boundary;
import com.google.protobuf.ByteString;
import java.lang.ref.Cleaner;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.Snapshot;
Expand All @@ -41,6 +41,7 @@ class RocksDBKVSpaceIterator implements IKVSpaceIterator {
private final RocksDBKVEngineIterator rocksItr;
private final ISyncContext.IRefresher refresher;
private final Cleaner.Cleanable onClose;

public RocksDBKVSpaceIterator(RocksDB db,
ColumnFamilyHandle cfHandle,
Boundary boundary,
Expand All @@ -53,11 +54,20 @@ public RocksDBKVSpaceIterator(RocksDB db,
Snapshot snapshot,
Boundary boundary,
ISyncContext.IRefresher refresher) {
this(db, cfHandle, snapshot, boundary, refresher, true);
}

public RocksDBKVSpaceIterator(RocksDB db,
ColumnFamilyHandle cfHandle,
Snapshot snapshot,
Boundary boundary,
ISyncContext.IRefresher refresher,
boolean fillCache) {
byte[] startKey = startKeyBytes(boundary);
byte[] endKey = endKeyBytes(boundary);
startKey = startKey != null ? toDataKey(startKey) : DATA_SECTION_START;
endKey = endKey != null ? toDataKey(endKey) : DATA_SECTION_END;
this.rocksItr = new RocksDBKVEngineIterator(db, cfHandle, snapshot, startKey, endKey);
this.rocksItr = new RocksDBKVEngineIterator(db, cfHandle, snapshot, startKey, endKey, fillCache);
this.refresher = refresher;
onClose = CLEANER.register(this, new State(rocksItr));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@

package org.apache.bifromq.basekv.localengine.rocksdb;

import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
import static java.util.Collections.singletonList;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.DATA_SECTION_END;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.DATA_SECTION_START;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toDataKey;
import static org.apache.bifromq.basekv.utils.BoundaryUtil.compare;
import static org.apache.bifromq.basekv.utils.BoundaryUtil.isValid;
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
import static java.util.Collections.singletonList;
import static org.rocksdb.SizeApproximationFlag.INCLUDE_FILES;
import static org.rocksdb.SizeApproximationFlag.INCLUDE_MEMTABLES;

import com.google.protobuf.ByteString;
import java.util.Optional;
import org.apache.bifromq.basekv.localengine.AbstractKVSpaceReader;
import org.apache.bifromq.basekv.localengine.IKVSpaceIterator;
import org.apache.bifromq.basekv.localengine.ISyncContext;
import org.apache.bifromq.basekv.localengine.KVEngineException;
import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
import org.apache.bifromq.basekv.proto.Boundary;
import com.google.protobuf.ByteString;
import java.util.Optional;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.Range;
import org.rocksdb.RocksDB;
Expand Down Expand Up @@ -86,12 +86,12 @@ protected final Optional<ByteString> doGet(ByteString key) {
}

@Override
protected final IKVSpaceIterator doNewIterator() {
protected IKVSpaceIterator doNewIterator() {
return new RocksDBKVSpaceIterator(db(), cfHandle(), Boundary.getDefaultInstance(), newRefresher());
}

@Override
protected final IKVSpaceIterator doNewIterator(Boundary subBoundary) {
protected IKVSpaceIterator doNewIterator(Boundary subBoundary) {
assert isValid(subBoundary);
return new RocksDBKVSpaceIterator(db(), cfHandle(), subBoundary, newRefresher());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,26 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basekv.localengine.rocksdb;

import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toDataKey;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toMetaKey;
import static org.apache.bifromq.basekv.utils.BoundaryUtil.isValid;
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;

import com.google.protobuf.ByteString;
import java.lang.ref.Cleaner;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.bifromq.basekv.localengine.AbstractKVSpaceReader;
import org.apache.bifromq.basekv.localengine.IKVSpaceIterator;
import org.apache.bifromq.basekv.localengine.ISyncContext;
import org.apache.bifromq.basekv.localengine.KVEngineException;
import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
import org.apache.bifromq.basekv.proto.Boundary;
import com.google.protobuf.ByteString;
import java.lang.ref.Cleaner;
import java.util.Optional;
import java.util.function.Supplier;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
Expand Down Expand Up @@ -119,8 +119,7 @@ protected Optional<ByteString> doGet(ByteString key) {

@Override
protected IKVSpaceIterator doNewIterator() {
return new RocksDBKVSpaceIterator(db, cfHandle, snapshot, Boundary.getDefaultInstance(), DUMMY_REFRESHER
);
return new RocksDBKVSpaceIterator(db, cfHandle, snapshot, Boundary.getDefaultInstance(), DUMMY_REFRESHER);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@

package org.apache.bifromq.basekv.localengine.rocksdb;

import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toMetaKey;
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toMetaKey;

import com.google.protobuf.ByteString;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.bifromq.basekv.localengine.IKVSpaceIterator;
import org.apache.bifromq.basekv.localengine.IKVSpaceMetadataWriter;
import org.apache.bifromq.basekv.localengine.IKVSpaceWriter;
import org.apache.bifromq.basekv.localengine.ISyncContext;
import org.apache.bifromq.basekv.localengine.KVEngineException;
import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
import org.apache.bifromq.basekv.proto.Boundary;
import com.google.protobuf.ByteString;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
Expand All @@ -42,6 +42,8 @@
class RocksDBKVSpaceWriter<E extends RocksDBKVEngine<E, T, C>, T extends
RocksDBKVSpace<E, T, C>, C extends RocksDBKVEngineConfigurator<C>>
extends RocksDBKVSpaceReader implements IKVSpaceWriter {
private static final long MIGRATION_FLUSH_BYTES = 4L * 1024 * 1024;
private static final int MIGRATION_FLUSH_OPS = 4096;
private final RocksDB db;
private final ColumnFamilyHandle cfHandle;
private final ISyncContext syncContext;
Expand Down Expand Up @@ -135,12 +137,13 @@ public IKVSpaceWriter clear(Boundary boundary) {
public IKVSpaceMetadataWriter migrateTo(String targetSpaceId, Boundary boundary) {
try {
RocksDBKVSpace<?, ?, ?> targetKVSpace = engine.createIfMissing(targetSpaceId);
IKVSpaceWriter targetKVSpaceWriter = targetKVSpace.toWriter();
RocksDBKVSpaceWriter<?, ?, ?> targetKVSpaceWriter = (RocksDBKVSpaceWriter<?, ?, ?>) targetKVSpace.toWriter();
// move data
int c = 0;
try (IKVSpaceIterator itr = newIterator(boundary)) {
for (itr.seekToFirst(); itr.isValid(); itr.next()) {
targetKVSpaceWriter.put(itr.key(), itr.value());
targetKVSpaceWriter.flushIfNeededForMigration();
c++;
}
}
Expand All @@ -163,6 +166,7 @@ public IKVSpaceMetadataWriter migrateFrom(String fromSpaceId, Boundary boundary)
try (IKVSpaceIterator itr = sourceKVSpace.newIterator(boundary)) {
for (itr.seekToFirst(); itr.isValid(); itr.next()) {
helper.put(cfHandle(), itr.key(), itr.value());
flushIfNeededForMigration();
}
}
// clear moved data in right range
Expand All @@ -180,7 +184,7 @@ public void done() {
opMeters.writeBatchSizeSummary.record(helper.count());
helper.done();
writeStatsRecorder.stop();
} catch (RocksDBException e) {
} catch (Throwable e) {
logger.error("Write Batch commit failed", e);
throw new KVEngineException("Batch commit failed", e);
}
Expand Down Expand Up @@ -217,6 +221,14 @@ protected ColumnFamilyHandle cfHandle() {
return cfHandle;
}

private void flushIfNeededForMigration() {
// ensure metadata changes are flushed atomically
if (!helper.hasPendingMetadata()
&& (helper.count() >= MIGRATION_FLUSH_OPS || helper.dataSize() >= MIGRATION_FLUSH_BYTES)) {
helper.flush();
}
}

@Override
protected ISyncContext.IRefresher newRefresher() {
return syncContext.refresher();
Expand Down
Loading
Loading