Skip to content

Commit 7098b0f

Browse files
committed
[server][common][controller][vpj] Materialized view projection and filter support
Add projection and filtering support for materialized view (MV) to be more efficient about unwanted data for view consumers. Projection can be enabled by setting projection fields in the materialized view parameters. Similarly filtering can be enabled by setting filter by fields. These two features can be enabled separately or together. If enabled together the filter by fields will be included in the projecting fields automatically. Here is an example MV configuration to illustrate the ideas: Record containing fields: {a, b, c, d, e} Projecting fields: {b, c} Filtering fields: {a} The only filtering option for now is to skip if none of the filter by fields changed. The filtering is also only applied during hybrid ingestion since it doesn't make sense to have a change filter on batch push. With the above setup we will project and write all batch data to the MV ({a, b, c}). RT updates (full PUT or UPDATE) will project and write the resulting record to the MV ({a, b, c}) only if the value of field (a) is different from the old value. All DELETE events will be written to the MV (no filtering). In order to achieve the above behavior there are several changes: 1. Previously we've used pub sub message headers to perform forwarding to handle chunks during NR pass-through in remote regions. This strategy will not work with projection because in order for us to perform projection on batch data in remote regions, we will need the remote partition leaders to assemble the chunks during NR pass-through. We are replacing the forwarding strategy with InMemoryChunkAssembler. To ensure leaders don't resume in-between chunks we will also buffer and delay writing the chunks to drainer until we have a fully assembled record and produced it to view topic(s). 2. Added enforcement in controller to ensure view configs are immutable. Projection schema is generated when adding a new materialized view and stored with the view config. Since there can only be one schema version per view, the znode size should be manageable with compression. If this becomes a concern we can also store it separately or generate it on the fly. We also verify the filtering by fields and projection fields to ensure they exist in latest superset or value schema and have default values. 3. Projection is performed in ComplexVeniceWriter as part of complexPut so both VPJ and leaders can use the same code for projection. Filtering is performed in MaterializedViewWriter since current offering of change filter is applicable only to hyrbid writes.
1 parent 37b5281 commit 7098b0f

28 files changed

+980
-293
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104
public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsumer<K, V> {
105105
private static final Logger LOGGER = LogManager.getLogger(VeniceChangelogConsumerImpl.class);
106106
private static final int MAX_SUBSCRIBE_RETRIES = 5;
107-
private static final String ROCKSDB_BUFFER_FOLDER = "rocksdb-chunk-buffer";
107+
private static final String ROCKSDB_BUFFER_FOLDER_PREFIX = "rocksdb-chunk-buffer-";
108108
protected long subscribeTime = Long.MAX_VALUE;
109109

110110
protected final ReadWriteLock subscriptionLock = new ReentrantReadWriteLock();
@@ -191,8 +191,11 @@ public VeniceChangelogConsumerImpl(
191191
throw new VeniceException("bootstrapFileSystemPath must be configured for consuming view store: " + storeName);
192192
}
193193
// Create a new folder in user provided path so if the path contains other important files we don't drop it.
194-
rocksDBBufferProperties
195-
.put(DATA_BASE_PATH, RocksDBUtils.composeStoreDbDir(rocksDBBufferPath, ROCKSDB_BUFFER_FOLDER));
194+
rocksDBBufferProperties.put(
195+
DATA_BASE_PATH,
196+
RocksDBUtils.composeStoreDbDir(
197+
rocksDBBufferPath,
198+
ROCKSDB_BUFFER_FOLDER_PREFIX + changelogClientConfig.getConsumerName()));
196199
// These properties are required to build a VeniceServerConfig but is never used by RocksDBStorageEngineFactory.
197200
// Instead of setting these configs, we could refactor RocksDBStorageEngineFactory to take a more generic config.
198201
rocksDBBufferProperties.put(CLUSTER_NAME, "");

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import java.util.concurrent.locks.ReentrantLock;
7272
import java.util.function.BiConsumer;
7373
import java.util.function.BooleanSupplier;
74+
import java.util.function.Consumer;
7475
import org.apache.avro.generic.GenericRecord;
7576
import org.apache.avro.io.BinaryDecoder;
7677
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -456,7 +457,8 @@ private PubSubMessageProcessedResult processActiveActiveMessage(
456457
consumerRecord.getTopicPartition(),
457458
valueManifestContainer,
458459
beforeProcessingBatchRecordsTimestampMs));
459-
if (hasChangeCaptureView || (hasComplexVenicePartitionerMaterializedView && msgType == MessageType.DELETE)) {
460+
if (hasChangeCaptureView || hasFilterByFieldsMaterializedView
461+
|| (hasComplexVenicePartitionerMaterializedView && msgType == MessageType.DELETE)) {
460462
/**
461463
* Since this function will update the transient cache before writing the view, and if there is
462464
* a change capture view writer, we need to lookup first, otherwise the transient cache will be populated
@@ -655,8 +657,9 @@ protected void processMessageAndMaybeProduceToKafka(
655657
// following function
656658
// call in this context much less obtrusive, however, it implies that all views can only work for AA stores
657659

658-
// Write to views
659-
Runnable produceToVersionTopic = () -> producePutOrDeleteToKafka(
660+
// Write to views. In A/A ingestion we never need to delay VT writes. Using local variables is sufficient to
661+
// define the produceToVersionTopic consumer.
662+
Consumer<PubSubMessageProcessedResultWrapper> produceToVersionTopic = (ignored) -> producePutOrDeleteToKafka(
660663
mergeConflictResultWrapper,
661664
partitionConsumptionState,
662665
keyBytes,
@@ -676,25 +679,25 @@ protected void processMessageAndMaybeProduceToKafka(
676679
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
677680
int oldValueSchemaId =
678681
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
679-
Lazy<GenericRecord> valueProvider = mergeConflictResultWrapper.getValueProvider();
680682
// The helper function takes in a BiFunction but the parameter for view partition set will never be used and
681683
// always null for A/A ingestion of the RT topic.
682684
queueUpVersionTopicWritesWithViewWriters(
683685
partitionConsumptionState,
684-
(viewWriter, ignored) -> viewWriter.processRecord(
686+
(viewWriter) -> viewWriter.processRecord(
685687
mergeConflictResultWrapper.getUpdatedValueBytes(),
686688
oldValueBB,
687689
keyBytes,
688690
mergeConflictResult.getValueSchemaId(),
689691
oldValueSchemaId,
690692
mergeConflictResult.getRmdRecord(),
691-
valueProvider),
692-
null,
693-
produceToVersionTopic);
693+
mergeConflictResultWrapper.getValueProvider(),
694+
mergeConflictResultWrapper.getDeserializedOldValueProvider()),
695+
produceToVersionTopic,
696+
Collections.singletonList(consumerRecordWrapper));
694697
} else {
695698
// This function may modify the original record in KME and it is unsafe to use the payload from KME directly
696699
// after this call.
697-
produceToVersionTopic.run();
700+
produceToVersionTopic.accept(consumerRecordWrapper);
698701
}
699702
}
700703
}

0 commit comments

Comments
 (0)