diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index be4d57e637c..efd590ed0c4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -104,7 +104,7 @@ public class VeniceChangelogConsumerImpl implements VeniceChangelogConsumer { private static final Logger LOGGER = LogManager.getLogger(VeniceChangelogConsumerImpl.class); private static final int MAX_SUBSCRIBE_RETRIES = 5; - private static final String ROCKSDB_BUFFER_FOLDER = "rocksdb-chunk-buffer"; + private static final String ROCKSDB_BUFFER_FOLDER_PREFIX = "rocksdb-chunk-buffer-"; protected long subscribeTime = Long.MAX_VALUE; protected final ReadWriteLock subscriptionLock = new ReentrantReadWriteLock(); @@ -191,8 +191,11 @@ public VeniceChangelogConsumerImpl( throw new VeniceException("bootstrapFileSystemPath must be configured for consuming view store: " + storeName); } // Create a new folder in user provided path so if the path contains other important files we don't drop it. - rocksDBBufferProperties - .put(DATA_BASE_PATH, RocksDBUtils.composeStoreDbDir(rocksDBBufferPath, ROCKSDB_BUFFER_FOLDER)); + rocksDBBufferProperties.put( + DATA_BASE_PATH, + RocksDBUtils.composeStoreDbDir( + rocksDBBufferPath, + ROCKSDB_BUFFER_FOLDER_PREFIX + changelogClientConfig.getConsumerName())); // These properties are required to build a VeniceServerConfig but is never used by RocksDBStorageEngineFactory. // Instead of setting these configs, we could refactor RocksDBStorageEngineFactory to take a more generic config. rocksDBBufferProperties.put(CLUSTER_NAME, ""); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 5c953d514dd..f9cc575baa8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -71,6 +71,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -456,7 +457,8 @@ private PubSubMessageProcessedResult processActiveActiveMessage( consumerRecord.getTopicPartition(), valueManifestContainer, beforeProcessingBatchRecordsTimestampMs)); - if (hasChangeCaptureView || (hasComplexVenicePartitionerMaterializedView && msgType == MessageType.DELETE)) { + if (hasChangeCaptureView || hasFilterByFieldsMaterializedView + || (hasComplexVenicePartitionerMaterializedView && msgType == MessageType.DELETE)) { /** * Since this function will update the transient cache before writing the view, and if there is * a change capture view writer, we need to lookup first, otherwise the transient cache will be populated @@ -655,8 +657,9 @@ protected void processMessageAndMaybeProduceToKafka( // following function // call in this context much less obtrusive, however, it implies that all views can only work for AA stores - // Write to views - Runnable produceToVersionTopic = () -> producePutOrDeleteToKafka( + // Write to views. In A/A ingestion we never need to delay VT writes. Using local variables is sufficient to + // define the produceToVersionTopic consumer. + Consumer produceToVersionTopic = (ignored) -> producePutOrDeleteToKafka( mergeConflictResultWrapper, partitionConsumptionState, keyBytes, @@ -676,25 +679,25 @@ protected void processMessageAndMaybeProduceToKafka( ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get(); int oldValueSchemaId = oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId(); - Lazy valueProvider = mergeConflictResultWrapper.getValueProvider(); // The helper function takes in a BiFunction but the parameter for view partition set will never be used and // always null for A/A ingestion of the RT topic. queueUpVersionTopicWritesWithViewWriters( partitionConsumptionState, - (viewWriter, ignored) -> viewWriter.processRecord( + (viewWriter) -> viewWriter.processRecord( mergeConflictResultWrapper.getUpdatedValueBytes(), oldValueBB, keyBytes, mergeConflictResult.getValueSchemaId(), oldValueSchemaId, mergeConflictResult.getRmdRecord(), - valueProvider), - null, - produceToVersionTopic); + mergeConflictResultWrapper.getValueProvider(), + mergeConflictResultWrapper.getDeserializedOldValueProvider()), + produceToVersionTopic, + Collections.singletonList(consumerRecordWrapper)); } else { // This function may modify the original record in KME and it is unsafe to use the payload from KME directly // after this call. - produceToVersionTopic.run(); + produceToVersionTopic.accept(consumerRecordWrapper); } } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index b79924cc7ab..7ba8aef9378 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -31,8 +31,8 @@ import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; +import com.linkedin.davinci.store.record.ByteBufferValueRecord; import com.linkedin.davinci.store.record.ValueRecord; -import com.linkedin.davinci.store.view.ChangeCaptureViewWriter; import com.linkedin.davinci.store.view.MaterializedViewWriter; import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.davinci.validation.DivSnapshot; @@ -80,6 +80,7 @@ import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.serialization.AvroStoreDeserializerCache; +import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.serializer.RecordDeserializer; @@ -91,7 +92,6 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; -import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.ChunkAwareCallback; import com.linkedin.venice.writer.LeaderCompleteState; import com.linkedin.venice.writer.LeaderMetadataWrapper; @@ -124,11 +124,13 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.LongPredicate; import java.util.function.Predicate; import java.util.function.Supplier; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.logging.log4j.LogManager; @@ -218,6 +220,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { protected final Map viewWriters; protected final boolean hasChangeCaptureView; protected final boolean hasComplexVenicePartitionerMaterializedView; + protected final boolean hasFilterByFieldsMaterializedView; protected final AvroStoreDeserializerCache storeDeserializerCache; @@ -227,6 +230,8 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { private final Version version; protected final ExecutorService aaWCIngestionStorageLookupThreadPool; + private final Lazy chunkingSuffixSerializer = + Lazy.of(KeyWithChunkingSuffixSerializer::new); public LeaderFollowerStoreIngestionTask( StorageService storageService, @@ -353,21 +358,31 @@ public LeaderFollowerStoreIngestionTask( schemaRepository.getKeySchema(store.getName()).getSchema()); boolean tmpValueForHasChangeCaptureViewWriter = false; boolean tmpValueForHasComplexVenicePartitioner = false; + boolean tmpValueForHasFilterByFields = false; for (Map.Entry viewWriter: viewWriters.entrySet()) { - if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) { + if (viewWriter.getValue().getViewWriterType() == VeniceViewWriter.ViewWriterType.CHANGE_CAPTURE_VIEW) { tmpValueForHasChangeCaptureViewWriter = true; } else if (viewWriter.getValue().getViewWriterType() == VeniceViewWriter.ViewWriterType.MATERIALIZED_VIEW) { - if (((MaterializedViewWriter) viewWriter.getValue()).isComplexVenicePartitioner()) { + MaterializedViewWriter materializedViewWriter = (MaterializedViewWriter) viewWriter.getValue(); + if (materializedViewWriter.isComplexVenicePartitioner()) { tmpValueForHasComplexVenicePartitioner = true; } + if (materializedViewWriter.isProjectionEnabled()) { + materializedViewWriter.configureWriterForProjection(compressor); + } + if (materializedViewWriter.isFilterByFieldsEnabled()) { + tmpValueForHasFilterByFields = true; + } } } hasChangeCaptureView = tmpValueForHasChangeCaptureViewWriter; hasComplexVenicePartitionerMaterializedView = tmpValueForHasComplexVenicePartitioner; + hasFilterByFieldsMaterializedView = tmpValueForHasFilterByFields; } else { viewWriters = Collections.emptyMap(); hasChangeCaptureView = false; hasComplexVenicePartitionerMaterializedView = false; + hasFilterByFieldsMaterializedView = false; } this.storeDeserializerCache = new AvroStoreDeserializerCache( builder.getSchemaRepo(), @@ -3179,9 +3194,19 @@ private PubSubMessageProcessedResult processMessage( KafkaMessageEnvelope kafkaValue = consumerRecord.getValue(); byte[] keyBytes = kafkaKey.getKey(); MessageType msgType = MessageType.valueOf(kafkaValue.messageType); - Lazy valueProvider; + Lazy valueProvider = Lazy.of(() -> null); + Lazy oldValueProvider = Lazy.of(() -> null); switch (msgType) { case PUT: + if (hasFilterByFieldsMaterializedView) { + GenericRecord oldValue = readStoredValueRecord( + partitionConsumptionState, + keyBytes, + schemaRepository.getSupersetSchema(storeName).getId(), + consumerRecord.getTopicPartition(), + new ChunkedValueManifestContainer()); + oldValueProvider = Lazy.of(() -> oldValue); + } Put put = (Put) kafkaValue.payloadUnion; // Value provider should use un-compressed data. final ByteBuffer rawPutValue = put.putValue; @@ -3221,7 +3246,8 @@ private PubSubMessageProcessedResult processMessage( null); } - return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(put, null, false, valueProvider)); + return new PubSubMessageProcessedResult( + new WriteComputeResultWrapper(put, null, false, valueProvider, oldValueProvider)); case UPDATE: /** @@ -3263,7 +3289,11 @@ private PubSubMessageProcessedResult processMessage( readerValueSchemaId, consumerRecord.getTopicPartition(), valueManifestContainer); - + if (hasFilterByFieldsMaterializedView) { + // Copy the currValue since it will be used for in-place update(s). + GenericRecord oldValue = new GenericData.Record((GenericData.Record) currValue, true); + oldValueProvider = Lazy.of(() -> oldValue); + } final byte[] updatedValueBytes; final ChunkedValueManifest oldValueManifest = valueManifestContainer.getManifest(); WriteComputeResult writeComputeResult; @@ -3321,25 +3351,22 @@ private PubSubMessageProcessedResult processMessage( updatedPut, oldValueManifest, false, - Lazy.of(writeComputeResult::getUpdatedValue))); + Lazy.of(writeComputeResult::getUpdatedValue), + oldValueProvider)); } case DELETE: - Lazy oldValueProvider; if (hasComplexVenicePartitionerMaterializedView) { // Best-effort to provide the old value for delete operation in case needed by a ComplexVeniceWriter to // generate deletes for materialized view topic partition(s). We need to do a non-lazy lookup before, so we // have a chance of getting the old value before the transient record cache is updated to null as part of // processing the DELETE. - int oldValueReaderSchemaId = schemaRepository.getSupersetSchema(storeName).getId(); GenericRecord oldValue = readStoredValueRecord( partitionConsumptionState, keyBytes, - oldValueReaderSchemaId, + schemaRepository.getSupersetSchema(storeName).getId(), consumerRecord.getTopicPartition(), new ChunkedValueManifestContainer()); oldValueProvider = Lazy.of(() -> oldValue); - } else { - oldValueProvider = Lazy.of(() -> null); } /** * For WC enabled stores update the transient record map with the latest {key,null} for similar reason as mentioned in PUT above. @@ -3348,7 +3375,8 @@ private PubSubMessageProcessedResult processMessage( partitionConsumptionState .setTransientRecord(kafkaClusterId, consumerRecord.getPosition().getNumericOffset(), keyBytes, -1, null); } - return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false, oldValueProvider)); + return new PubSubMessageProcessedResult( + new WriteComputeResultWrapper(null, null, false, valueProvider, oldValueProvider)); default: throw new VeniceMessageException( @@ -3387,31 +3415,84 @@ protected void processMessageAndMaybeProduceToKafka( if (msgType.equals(UPDATE) && writeComputeResultWrapper.isSkipProduce()) { return; } - Runnable produceToVersionTopic = () -> produceToLocalKafkaHelper( - consumerRecord, - partitionConsumptionState, - writeComputeResultWrapper, - partition, - kafkaUrl, - kafkaClusterId, - beforeProcessingRecordTimestampNs); - // Write to views + Put newPut = writeComputeResultWrapper.getNewPut(); + // Write to views. if (hasViewWriters()) { - Put newPut = writeComputeResultWrapper.getNewPut(); - Map> viewPartitionMap = null; + consumerRecordWrapper.setProcessedResult(new PubSubMessageProcessedResult(writeComputeResultWrapper)); + consumerRecordWrapper.setBeforeProcessingPerRecordTimestampNs(beforeProcessingRecordTimestampNs); + Consumer produceToVersionTopic = + (messageWrapper) -> produceToLocalKafkaHelper( + messageWrapper.getMessage(), + partitionConsumptionState, + messageWrapper.getProcessedResult().getWriteComputeResultWrapper(), + partition, + kafkaUrl, + kafkaClusterId, + messageWrapper.getBeforeProcessingPerRecordTimestampNs()); if (!partitionConsumptionState.isEndOfPushReceived()) { - // NR pass-through records are expected to carry view partition map in the message header - viewPartitionMap = ViewUtils.extractViewPartitionMap(consumerRecord.getPubSubMessageHeaders()); + // Native replication (NR) pass-through mode + partitionConsumptionState.addMessageToPendingMessages(consumerRecordWrapper); + ByteBufferValueRecord valueRecord = chunkAssembler.get() + .bufferAndAssembleRecord( + consumerRecord.getTopicPartition(), + newPut.schemaId, + keyBytes, + newPut.putValue, + consumerRecord.getPosition().getNumericOffset(), + compressor.get()); + if (valueRecord != null) { + // Only process full records + Lazy valueProvider = Lazy.of(() -> { + try { + return storeDeserializerCache.getDeserializer(valueRecord.writerSchemaId(), valueRecord.writerSchemaId()) + .deserialize(compressor.get().decompress(valueRecord.value())); + } catch (IOException e) { + throw new VeniceException( + "Unable to provide value during NR pass-through due to decompression failure", + e); + } + }); + final byte[] originalKeyBytes; + if (isChunked) { + // valueRecord is not null for full records and manifest and the key for both is serialized with non chunked + // key suffix in source version topic if chunking is enabled. + originalKeyBytes = chunkingSuffixSerializer.get().extractKeyFromNonChunkedKeySuffix(keyBytes); + } else { + originalKeyBytes = keyBytes; + } + queueUpVersionTopicWritesWithViewWriters( + partitionConsumptionState, + (viewWriter) -> viewWriter.processRecord( + valueRecord.value(), + originalKeyBytes, + valueRecord.writerSchemaId(), + valueProvider, + Lazy.of(() -> null)), + produceToVersionTopic, + partitionConsumptionState.getAndClearPendingMessagesToLocalVT()); + } + } else { + // NR standard mode where upstream is RT topic. + queueUpVersionTopicWritesWithViewWriters( + partitionConsumptionState, + (viewWriter) -> viewWriter.processRecord( + newPut.putValue, + keyBytes, + newPut.schemaId, + writeComputeResultWrapper.getValueProvider(), + writeComputeResultWrapper.getOldValueProvider()), + produceToVersionTopic, + Collections.singletonList(consumerRecordWrapper)); } - Lazy newValueProvider = writeComputeResultWrapper.getValueProvider(); - queueUpVersionTopicWritesWithViewWriters( - partitionConsumptionState, - (viewWriter, viewPartitionSet) -> viewWriter - .processRecord(newPut.putValue, keyBytes, newPut.schemaId, viewPartitionSet, newValueProvider), - viewPartitionMap, - produceToVersionTopic); } else { - produceToVersionTopic.run(); + produceToLocalKafkaHelper( + consumerRecord, + partitionConsumptionState, + writeComputeResultWrapper, + partition, + kafkaUrl, + kafkaClusterId, + beforeProcessingRecordTimestampNs); } } @@ -4100,9 +4181,9 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio protected void queueUpVersionTopicWritesWithViewWriters( PartitionConsumptionState partitionConsumptionState, - BiFunction, CompletableFuture> viewWriterRecordProcessor, - Map> viewPartitionMap, - Runnable versionTopicWrite) { + Function> viewWriterRecordProcessor, + Consumer versionTopicWrite, + List consumerRecordWrappers) { long preprocessingTime = System.currentTimeMillis(); CompletableFuture currentVersionTopicWrite = new CompletableFuture<>(); CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1]; @@ -4110,21 +4191,15 @@ protected void queueUpVersionTopicWritesWithViewWriters( // The first future is for the previous write to VT viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture(); for (VeniceViewWriter writer: viewWriters.values()) { - Set viewPartitionSet = null; - if (viewPartitionMap != null && writer.getViewWriterType() == VeniceViewWriter.ViewWriterType.MATERIALIZED_VIEW) { - MaterializedViewWriter mvWriter = (MaterializedViewWriter) writer; - viewPartitionSet = viewPartitionMap.get(mvWriter.getViewName()); - if (viewPartitionSet == null) { - throw new VeniceException("Unable to find view partition set for view: " + mvWriter.getViewName()); - } - } - viewWriterFutures[index++] = viewWriterRecordProcessor.apply(writer, viewPartitionSet); + viewWriterFutures[index++] = viewWriterRecordProcessor.apply(writer); } hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); if (exception == null) { - versionTopicWrite.run(); + for (PubSubMessageProcessedResultWrapper consumerRecordWrapper: consumerRecordWrappers) { + versionTopicWrite.accept(consumerRecordWrapper); + } currentVersionTopicWrite.complete(null); } else { VeniceException veniceException = new VeniceException(exception); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java index c5ded56590e..bf3fb2779c5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java @@ -26,10 +26,14 @@ public class MergeConflictResultWrapper { private final ByteBuffer updatedRmdBytes; /** - * Best-effort deserialized value provider that provides the updated value for PUT/UPDATE and the old value for - * DELETE. + * Best-effort deserialized value provider that provides the updated value for PUT/UPDATE */ private final Lazy valueProvider; + /** + * Best-effort deserialized value provider that provides the updated value for PUT/UPDATE/DELETE. Returns null if the + * key/value didn't exist + */ + private final Lazy deserializedOldValueProvider; public MergeConflictResultWrapper( MergeConflictResult mergeConflictResult, @@ -47,15 +51,21 @@ public MergeConflictResultWrapper( this.oldValueManifestContainer = oldValueManifestContainer; this.updatedValueBytes = updatedValueBytes; this.updatedRmdBytes = updatedRmdBytes; - if (updatedValueBytes == null) { - // this is a DELETE + + // We will always configure the deserializedOldValueProvider. Theoretically we could cache the deserialized old + // value in the UPDATE branch, but it will require a deep copy since the record is used for in-place update(s). To + // reduce complexity we are just going to deserialize the old bytes. + this.deserializedOldValueProvider = Lazy.of(() -> { ByteBufferValueRecord oldValue = oldValueProvider.get(); if (oldValue == null || oldValue.value() == null) { - this.valueProvider = Lazy.of(() -> null); + return null; } else { - this.valueProvider = - Lazy.of(() -> deserializerProvider.apply(oldValue.writerSchemaId()).deserialize(oldValue.value())); + return deserializerProvider.apply(oldValue.writerSchemaId()).deserialize(oldValue.value()); } + }); + if (updatedValueBytes == null) { + // this is a DELETE + this.valueProvider = Lazy.of(() -> null); } else { // this is a PUT or UPDATE if (mergeConflictResult.getDeserializedValue().isPresent()) { @@ -100,10 +110,13 @@ public ByteBuffer getUpdatedRmdBytes() { /** * Return a best-effort value provider with the following behaviors: * 1. returns the new value provider for PUT and UPDATE. - * 2. returns the old value for DELETE (null for non-existent key). * 3. returns null if the value is not available. */ public Lazy getValueProvider() { return valueProvider; } + + public Lazy getDeserializedOldValueProvider() { + return deserializedOldValueProvider; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index 5e381d9dd0e..cca3d4a12b9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -20,6 +20,7 @@ import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -225,6 +226,7 @@ enum LatchStatus { // veniceWriterLazyRef could be set and get in different threads, mark it volatile. private volatile Lazy> veniceWriterLazyRef; + private final LinkedList pendingMessagesToLocalVT; public PartitionConsumptionState(String replicaId, int partition, OffsetRecord offsetRecord, boolean hybrid) { this.replicaId = replicaId; @@ -270,6 +272,7 @@ public PartitionConsumptionState(String replicaId, int partition, OffsetRecord o this.leaderCompleteState = LeaderCompleteState.LEADER_NOT_COMPLETED; this.lastLeaderCompleteStateUpdateInMs = 0; this.pendingReportIncPushVersionList = offsetRecord.getPendingReportIncPushVersionList(); + this.pendingMessagesToLocalVT = new LinkedList<>(); } public int getPartition() { @@ -896,4 +899,14 @@ public void clearPendingReportIncPushVersionList() { pendingReportIncPushVersionList.clear(); offsetRecord.setPendingReportIncPushVersionList(pendingReportIncPushVersionList); } + + public void addMessageToPendingMessages(PubSubMessageProcessedResultWrapper message) { + pendingMessagesToLocalVT.add(message); + } + + public List getAndClearPendingMessagesToLocalVT() { + LinkedList pendingMessages = (LinkedList) pendingMessagesToLocalVT.clone(); + pendingMessagesToLocalVT.clear(); + return pendingMessages; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java index c6fed613ae4..28ce709f0c9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java @@ -6,6 +6,7 @@ public class PubSubMessageProcessedResultWrapper { private final DefaultPubSubMessage message; private PubSubMessageProcessedResult processedResult; + private long beforeProcessingPerRecordTimestampNs; public PubSubMessageProcessedResultWrapper(DefaultPubSubMessage message) { this.message = message; @@ -22,4 +23,12 @@ public PubSubMessageProcessedResult getProcessedResult() { public void setProcessedResult(PubSubMessageProcessedResult processedResult) { this.processedResult = processedResult; } + + public long getBeforeProcessingPerRecordTimestampNs() { + return beforeProcessingPerRecordTimestampNs; + } + + public void setBeforeProcessingPerRecordTimestampNs(long beforeProcessingPerRecordTimestampNs) { + this.beforeProcessingPerRecordTimestampNs = beforeProcessingPerRecordTimestampNs; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index e38faa09876..abd75e8007a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -333,7 +333,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final IngestionNotificationDispatcher ingestionNotificationDispatcher; - protected final InMemoryChunkAssembler chunkAssembler; + protected final Lazy chunkAssembler; private final Optional cacheBackend; private final Schema recordTransformerInputValueSchema; private final AvroGenericDeserializer recordTransformerKeyDeserializer; @@ -492,9 +492,9 @@ public StoreIngestionTask( new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion); this.missingSOPCheckExecutor.execute(() -> waitForStateVersion(kafkaVersionTopic)); this.cacheBackend = cacheBackend; + this.chunkAssembler = Lazy.of(() -> new InMemoryChunkAssembler(new InMemoryStorageEngine(storeName))); if (recordTransformerConfig != null && recordTransformerConfig.getRecordTransformerFunction() != null) { - this.chunkAssembler = new InMemoryChunkAssembler(new InMemoryStorageEngine(storeName)); Schema keySchema = schemaRepository.getKeySchema(storeName).getSchema(); this.recordTransformerKeyDeserializer = new AvroGenericDeserializer(keySchema, keySchema); this.recordTransformerInputValueSchema = schemaRepository.getSupersetOrLatestValueSchema(storeName).getSchema(); @@ -536,7 +536,6 @@ public StoreIngestionTask( this.recordTransformerKeyDeserializer = null; this.recordTransformerInputValueSchema = null; this.recordTransformerDeserializersByPutSchemaId = null; - this.chunkAssembler = null; } this.localKafkaServer = this.kafkaProps.getProperty(KAFKA_BOOTSTRAP_SERVERS); @@ -3835,13 +3834,14 @@ private int processKafkaDataMessage( if (recordTransformer != null && messageType == MessageType.PUT) { long recordTransformerStartTime = System.nanoTime(); - ByteBufferValueRecord assembledRecord = chunkAssembler.bufferAndAssembleRecord( - consumerRecord.getTopicPartition(), - put.getSchemaId(), - keyBytes, - put.getPutValue(), - consumerRecord.getPosition().getNumericOffset(), - compressor.get()); + ByteBufferValueRecord assembledRecord = chunkAssembler.get() + .bufferAndAssembleRecord( + consumerRecord.getTopicPartition(), + put.getSchemaId(), + keyBytes, + put.getPutValue(), + consumerRecord.getPosition().getNumericOffset(), + compressor.get()); // Current record is a chunk. We only write to the storage engine for fully assembled records if (assembledRecord == null) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java index a4e4a8f459c..b34f338a0d2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java @@ -17,20 +17,23 @@ public class WriteComputeResultWrapper { */ private final boolean skipProduce; private final Lazy valueProvider; + private final Lazy oldValueProvider; public WriteComputeResultWrapper(Put newPut, ChunkedValueManifest oldValueManifest, boolean skipProduce) { - this(newPut, oldValueManifest, skipProduce, Lazy.of(() -> null)); + this(newPut, oldValueManifest, skipProduce, Lazy.of(() -> null), Lazy.of(() -> null)); } public WriteComputeResultWrapper( Put newPut, ChunkedValueManifest oldValueManifest, boolean skipProduce, - Lazy valueProvider) { + Lazy valueProvider, + Lazy oldValueProvider) { this.newPut = newPut; this.oldValueManifest = oldValueManifest; this.skipProduce = skipProduce; this.valueProvider = valueProvider; + this.oldValueProvider = oldValueProvider; } public Put getNewPut() { @@ -48,10 +51,17 @@ public boolean isSkipProduce() { /** * Return a best-effort value provider with the following behaviors: * 1. returns the new value provider for PUT and UPDATE. - * 2. returns the old value for DELETE (null for non-existent key). - * 3. returns null if the value is not available. + * 2. returns null if the value is not available. */ public Lazy getValueProvider() { - return this.valueProvider; + return valueProvider; + } + + /** + * Return a best-effort old value provider that returns the old value prior to the PUT/UPDATE/DELETE. Returns null if + * the k/v didn't exist or not available. + */ + public Lazy getOldValueProvider() { + return oldValueProvider; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java index 25206becf7b..5ca526b336c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java @@ -2,11 +2,14 @@ import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.meta.ClusterInfoProvider; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.ReadOnlyViewStore; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreDataChangedListener; import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.schema.GeneratedSchemaID; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.rmd.RmdSchemaEntry; @@ -66,6 +69,31 @@ public SchemaEntry getKeySchema(String storeName) { @Override public SchemaEntry getValueSchema(String storeName, int id) { + if (VeniceView.isViewStore(storeName)) { + // We need to check if the view has projection enabled. + Store store = getStoreOrThrow(VeniceView.getStoreNameFromViewStoreName(storeName)); + String viewName = VeniceView.getViewNameFromViewStoreName(storeName); + // Search through the versions because it's possible that the view config is removed in the store config but a + // consumer is still ingesting the current/backup version that had the config. Ideally, we should only delete a + // view once we are sure all consumers are stopped (DVC/CC clients) but we can't really enforce that. + // In addition, this only works because view configs are immutable, otherwise we will need to know exactly which + // store version is the value schema for in order to provide the correct projection schema. + for (Version version: store.getVersions()) { + if (version.getViewConfigs().containsKey(viewName)) { + ViewConfig viewConfig = version.getViewConfigs().get(viewName); + if (viewConfig.getViewParameters() + .containsKey(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_SCHEMA.name())) { + String projectionSchemaString = viewConfig.getViewParameters() + .get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_SCHEMA.name()); + return new SchemaEntry(id, projectionSchemaString); + } else { + // This view does not have projection enabled. The regular store value schema will be returned accordingly. + return nativeMetadataRepository.getValueSchema(VeniceView.getStoreName(storeName), id); + } + } + } + throw new VeniceNoStoreException(storeName); + } return nativeMetadataRepository.getValueSchema(VeniceView.getStoreName(storeName), id); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java index c29be3e2f3f..e884e308a9b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -67,7 +66,8 @@ public CompletableFuture processRecord( int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord, - Lazy valueProvider) { + Lazy valueProvider, + Lazy oldValueProvider) { // TODO: not sold about having currentValue in the interface but it VASTLY simplifies a lot of things with regards // to dealing with compression/chunking/etc. in the storage layer. @@ -89,8 +89,8 @@ public CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - Set viewPartitionSet, - Lazy newValueProvider) { + Lazy newValueProvider, + Lazy oldValueProvider) { // No op return CompletableFuture.completedFuture(null); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java index 861327c6c8b..f887944bbb9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java @@ -2,6 +2,7 @@ import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; +import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; @@ -12,13 +13,13 @@ import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.ComplexVeniceWriter; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; import java.nio.ByteBuffer; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -33,7 +34,7 @@ public class MaterializedViewWriter extends VeniceViewWriter { private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory; private final MaterializedView internalView; private final String materializedViewTopicName; - private Lazy veniceWriter; + private Lazy> veniceWriter; public MaterializedViewWriter( VeniceConfigLoader props, @@ -46,11 +47,24 @@ public MaterializedViewWriter( new MaterializedView(props.getCombinedProperties().toProperties(), version.getStoreName(), extraViewParameters); materializedViewTopicName = internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get(); - this.veniceWriter = Lazy.of( + veniceWriter = Lazy.of( () -> new VeniceWriterFactory(props.getCombinedProperties().toProperties(), pubSubProducerAdapterFactory, null) .createComplexVeniceWriter(buildWriterOptions())); } + public void configureWriterForProjection(Lazy compressor) { + if (!isProjectionEnabled()) { + throw new VeniceException( + "Cannot configure writer for projection because projection is not enabled for view:" + + internalView.getViewName()); + } + ViewUtils.configureWriterForProjection( + veniceWriter.get(), + getViewName(), + compressor, + internalView.getProjectionSchema()); + } + /** * package private for testing purpose */ @@ -66,8 +80,9 @@ public CompletableFuture processRecord( int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord, - Lazy valueProvider) { - return processRecord(newValue, key, newValueSchemaId, null, valueProvider); + Lazy valueProvider, + Lazy oldValueProvider) { + return processRecord(newValue, key, newValueSchemaId, valueProvider, oldValueProvider); } /** @@ -79,24 +94,24 @@ public CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - Set viewPartitionSet, - Lazy newValueProvider) { - byte[] newValueBytes = newValue == null ? null : ByteUtils.extractByteArray(newValue); - if (viewPartitionSet != null) { - if (newValue == null) { - // This is unexpected because we only attach view partition map for PUT records. - throw new VeniceException( - "Encountered a null PUT record while having view partition map in the message header"); - } - // Forward the record to corresponding view partition without any processing (NR pass-through mode). - return veniceWriter.get().forwardPut(key, newValueBytes, newValueSchemaId, viewPartitionSet); - } + Lazy newValueProvider, + Lazy oldValueProvider) { if (newValue == null) { - // This is a delete operation. newValueProvider will contain the old value in a best effort manner. The old value - // might not be available if we are deleting a non-existing key. - return veniceWriter.get().complexDelete(key, newValueProvider); + // This is a delete operation. The old value might not be available if we are deleting a non-existing key. + return veniceWriter.get().complexDelete(key, oldValueProvider); } - return veniceWriter.get().complexPut(key, newValueBytes, newValueSchemaId, newValueProvider); + if (isFilterByFieldsEnabled()) { + // We only support one type of filter operation which is to skip records if the filter by fields didn't change + if (!ViewUtils.changeFilter( + oldValueProvider.get(), + newValueProvider.get(), + internalView.getFilterByFields(), + getViewName())) { + // Did not pass the change filter requirement, skip this record + return CompletableFuture.completedFuture(null); + } + } + return veniceWriter.get().complexPut(key, ByteUtils.extractByteArray(newValue), newValueSchemaId, newValueProvider); } @Override @@ -132,4 +147,12 @@ public boolean isComplexVenicePartitioner() { public String getViewName() { return internalView.getViewName(); } + + public boolean isProjectionEnabled() { + return internalView.getProjectionSchema() != null; + } + + public boolean isFilterByFieldsEnabled() { + return !internalView.getFilterByFields().isEmpty(); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java index 9397c4e0432..a1b5e67bf98 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java @@ -15,7 +15,6 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -69,9 +68,9 @@ public VeniceViewWriter( * @param key the key of the record that designates newValue and oldValue * @param newValueSchemaId the schemaId of the incoming record * @param oldValueSchemaId the schemaId of the old record - * @param replicationMetadataRecord the associated RMD for the incoming record. - * @param valueProvider to provide the corresponding deserialized newValue for PUT and UPDATE or the old value for the - * given key for DELETE. + * @param replicationMetadataRecord the associated RMD for the incoming record + * @param valueProvider to provide the deserialized new value + * @param oldValueProvider to provide the deserialized old value */ public abstract CompletableFuture processRecord( ByteBuffer newValue, @@ -80,7 +79,8 @@ public abstract CompletableFuture processRecord( int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord, - Lazy valueProvider); + Lazy valueProvider, + Lazy oldValueProvider); /** * To be called as a given ingestion task consumes each record. This is called prior to writing to a @@ -89,18 +89,15 @@ public abstract CompletableFuture processRecord( * @param newValue the incoming fully specified value which hasn't yet been committed to Venice * @param key the key of the record that designates newValue and oldValue * @param newValueSchemaId the schemaId of the incoming record - * @param viewPartitionSet set of view partitions this record should be processed to. This is used in NR - * pass-through when remote region leaders can forward record or chunks of a record - * to the correct view partitions without the need to perform chunk assembly or - * repartitioning. - * @param newValueProvider to provide the deserialized new value + * @param valueProvider to provide the deserialized new value + * @param oldValueProvider to provide the deserialized old value */ public abstract CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - Set viewPartitionSet, - Lazy newValueProvider); + Lazy valueProvider, + Lazy oldValueProvider); public abstract ViewWriterType getViewWriterType(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java index f1ec0b1fdc0..4d2a234ead3 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java @@ -69,6 +69,7 @@ import it.unimi.dsi.fastutil.objects.Object2IntMaps; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -314,10 +315,10 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc .thenReturn(CompletableFuture.completedFuture(null)); leaderFollowerStoreIngestionTask.queueUpVersionTopicWritesWithViewWriters( mockPartitionConsumptionState, - (viewWriter, viewPartitionSet) -> viewWriter - .processRecord(mock(ByteBuffer.class), new byte[1], 1, viewPartitionSet, Lazy.of(() -> null)), - null, - () -> writeToVersionTopic.set(true)); + (viewWriter) -> viewWriter + .processRecord(mock(ByteBuffer.class), new byte[1], 1, Lazy.of(() -> null), Lazy.of(() -> null)), + (consumerRecordWrapper) -> writeToVersionTopic.set(true), + Collections.singletonList(mock(PubSubMessageProcessedResultWrapper.class))); verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture(); ArgumentCaptor vtWriteFutureCaptor = ArgumentCaptor.forClass(CompletableFuture.class); verify(mockPartitionConsumptionState, times(1)).setLastVTProduceCallFuture(vtWriteFutureCaptor.capture()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java index 5d2831d389a..3206e8f8dfd 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java @@ -249,17 +249,41 @@ public void testProcessRecord() throws ExecutionException, InterruptedException Lazy dummyValueProvider = Lazy.of(() -> null); // Update Case changeCaptureViewWriter - .processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider) + .processRecord( + NEW_VALUE, + OLD_VALUE, + KEY, + 1, + 1, + rmdRecordWithValueLevelTimeStamp, + dummyValueProvider, + dummyValueProvider) .get(); // Insert Case changeCaptureViewWriter - .processRecord(NEW_VALUE, null, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider) + .processRecord( + NEW_VALUE, + null, + KEY, + 1, + 1, + rmdRecordWithValueLevelTimeStamp, + dummyValueProvider, + dummyValueProvider) .get(); // Deletion Case changeCaptureViewWriter - .processRecord(null, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider) + .processRecord( + null, + OLD_VALUE, + KEY, + 1, + 1, + rmdRecordWithValueLevelTimeStamp, + dummyValueProvider, + dummyValueProvider) .get(); // Set up argument captors diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java index 46cace62ec1..7cc20200642 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java @@ -5,11 +5,9 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -19,7 +17,6 @@ import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.davinci.utils.UnitTestComplexPartitioner; -import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; @@ -34,22 +31,17 @@ import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.VeniceProperties; -import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.writer.ComplexVeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.testng.Assert; import org.testng.annotations.Test; @@ -141,39 +133,6 @@ public void testProcessControlMessage() { verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); } - @Test - public void testViewWriterCanForwardCorrectly() { - String storeName = "testStoreWithChunkedKeys"; - String viewName = "testMaterializedViewWithChunkedKeys"; - Version version = mock(Version.class); - doReturn(true).when(version).isChunkingEnabled(); - doReturn(true).when(version).isRmdChunkingEnabled(); - getMockStore(storeName, 1, version); - MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName); - viewParamsBuilder.setPartitionCount(6); - viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); - Map viewParamsMap = viewParamsBuilder.build(); - VeniceConfigLoader props = getMockProps(); - MaterializedViewWriter materializedViewWriter = new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap); - ComplexVeniceWriter veniceWriter = mock(ComplexVeniceWriter.class); - doReturn(CompletableFuture.completedFuture(null)).when(veniceWriter).forwardPut(any(), any(), anyInt(), any()); - materializedViewWriter.setVeniceWriter(veniceWriter); - byte[] keyBytes = new byte[5]; - byte[] valueBytes = new byte[10]; - ByteBuffer value = ByteBuffer.wrap(valueBytes); - Set viewPartitionSet = new HashSet<>(); - viewPartitionSet.add(1); - viewPartitionSet.add(4); - Lazy valueProvider = mock(Lazy.class); - Assert.assertThrows( - VeniceException.class, - () -> materializedViewWriter.processRecord(null, keyBytes, 1, viewPartitionSet, valueProvider)); - materializedViewWriter.processRecord(value, keyBytes, 1, viewPartitionSet, valueProvider); - verify(veniceWriter, times(1)).forwardPut(eq(keyBytes), eq(valueBytes), eq(1), eq(viewPartitionSet)); - verify(veniceWriter, never()).complexPut(any(), any(), anyInt(), any()); - verify(veniceWriter, never()).complexDelete(any(), any()); - } - @Test public void testIsComplexVenicePartitioner() { String storeName = "testStore"; diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java index 7a74eb2d981..34722db610c 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java @@ -310,8 +310,11 @@ private void setupReducerConf(JobConf jobConf, PushJobSetting pushJobSetting) { jobConf.set(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.materializedViewConfigFlatMap); jobConf.set(VALUE_SCHEMA_DIR, pushJobSetting.valueSchemaDir); jobConf.set(RMD_SCHEMA_DIR, pushJobSetting.rmdSchemaDir); + // Disable speculative execution regardless of user config if materialized view is present + jobConf.setReduceSpeculativeExecution(false); + } else { + jobConf.setReduceSpeculativeExecution(vpjProperties.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false)); } - jobConf.setReduceSpeculativeExecution(vpjProperties.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false)); int partitionCount = pushJobSetting.partitionCount; jobConf.setInt(PARTITION_COUNT, partitionCount); jobConf.setNumReduceTasks(partitionCount); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java index 906d871a7eb..684859be152 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java @@ -165,7 +165,7 @@ public int getValueSchemaId() { private Lazy veniceWriterFactory; private AbstractVeniceWriter veniceWriter = null; private VeniceWriter mainWriter = null; - private ComplexVeniceWriter[] childWriters = null; + private ComplexVeniceWriter[] childWriters = null; private int valueSchemaId = -1; private int derivedValueSchemaId = -1; private boolean enableWriteCompute = false; @@ -431,7 +431,7 @@ private AbstractVeniceWriter createCompositeVeniceWriter version.setRmdChunkingEnabled(rmdChunkingEnabled); // Default deser and decompress function for simple partitioner where value provider is never going to be used. BiFunction valueExtractor = (valueBytes, valueSchemaId) -> null; - boolean complexPartitionerConfigured = false; + boolean valueExtractorConfigured = false; int index = 0; for (ViewConfig viewConfig: viewConfigMap.values()) { VeniceView view = ViewUtils @@ -439,9 +439,7 @@ private AbstractVeniceWriter createCompositeVeniceWriter String viewTopic = view.getTopicNamesAndConfigsForVersion(versionNumber).keySet().stream().findAny().get(); if (view instanceof MaterializedView) { MaterializedView materializedView = (MaterializedView) view; - if (materializedView.getViewPartitioner() - .getPartitionerType() == VenicePartitioner.VenicePartitionerType.COMPLEX - && !complexPartitionerConfigured) { + if (materializedView.isValueProviderNeeded() && !valueExtractorConfigured) { // Initialize value schemas, deser cache and other variables needed by ComplexVenicePartitioner initializeSchemaSourceAndDeserCache(); compressor.get(); @@ -461,10 +459,19 @@ private AbstractVeniceWriter createCompositeVeniceWriter .deserialize(decompressedBytes); }; // We only need to configure these variables once per CompositeVeniceWriter - complexPartitionerConfigured = true; + valueExtractorConfigured = true; } - childWriters[index++] = + ComplexVeniceWriter complexVeniceWriter = factory.createComplexVeniceWriter(view.getWriterOptionsBuilder(viewTopic, version).build()); + if (materializedView.getProjectionSchema() != null) { + // Configure writer for projection support + ViewUtils.configureWriterForProjection( + complexVeniceWriter, + materializedView.getViewName(), + compressor, + materializedView.getProjectionSchema()); + } + childWriters[index++] = complexVeniceWriter; } else { throw new UnsupportedOperationException("Only materialized view is supported in VPJ"); } @@ -608,10 +615,14 @@ protected void configureTask(VeniceProperties props) { writerProps.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, -1); EngineTaskConfigProvider engineTaskConfigProvider = getEngineTaskConfigProvider(); Properties jobProps = engineTaskConfigProvider.getJobProps(); - // Use the UUID bits created by the VPJ driver to build a producerGUID deterministically - writerProps.put(GuidUtils.GUID_GENERATOR_IMPLEMENTATION, GuidUtils.DETERMINISTIC_GUID_GENERATOR_IMPLEMENTATION); - writerProps.put(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS)); - writerProps.put(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS)); + // If materialized views are present we will disable speculative execution and use default GUID generator + if (this.props.getString(PUSH_JOB_VIEW_CONFIGS, "").isEmpty()) { + // Use the UUID bits created by the VPJ driver to build a producerGUID deterministically + writerProps.put(GuidUtils.GUID_GENERATOR_IMPLEMENTATION, GuidUtils.DETERMINISTIC_GUID_GENERATOR_IMPLEMENTATION); + writerProps.put(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS)); + writerProps + .put(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS)); + } return new VeniceWriterFactory(writerProps); }); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java index 5ceb0714b8e..f82c5c54498 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java @@ -140,8 +140,8 @@ private CompletableFuture compositePut( Map> viewPartitionMap = new HashMap<>(); int index = 0; for (ComplexVeniceWriter writer: childWriters) { - // There should be an entry for every materialized view, even if the partition set is empty. This way we can - // differentiate between skipped view write and missing view partition info unexpectedly. + // There should be an entry for every materialized view in the view partition map, even if the partition set is + // empty. This way we can differentiate between skipped view write and missing view partition info unexpectedly. childFutures[index++] = writer.complexPut( key, value, diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java index ce9d27204f5..535d0254bb8 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java @@ -27,17 +27,27 @@ public enum MaterializedViewParameters { * Parameter key used to specify the partition count for the re-partition view. */ MATERIALIZED_VIEW_PARTITION_COUNT, - + /** + * Parameter key used to specify the top level fields to perform filtering on for records to be produced to the view. + */ + MATERIALIZED_VIEW_FILTER_BY_FIELDS, /** * Parameter key used to specify the top level fields to perform projection on for records in the materialized view. */ - MATERIALIZED_VIEW_PROJECTION_FIELDS; + MATERIALIZED_VIEW_PROJECTION_FIELDS, + + /** + * Parameter key used to persist the generated projection schema. This is meant for internal use and purposely not + * exposed to the {@link Builder} + */ + MATERIALIZED_VIEW_PROJECTION_SCHEMA; public static class Builder { - private String viewName; + private final String viewName; private String partitioner; private String partitionerParams; private String partitionCount; + private List filterByFields = Collections.emptyList(); private List projectionFields = Collections.emptyList(); public Builder(String viewName) { @@ -49,13 +59,15 @@ public Builder(String viewName, Map viewParams) { this.partitioner = viewParams.get(MATERIALIZED_VIEW_PARTITIONER.name()); this.partitionerParams = viewParams.get(MATERIALIZED_VIEW_PARTITIONER_PARAMS.name()); this.partitionCount = viewParams.get(MATERIALIZED_VIEW_PARTITION_COUNT.name()); + String filteringFieldsString = viewParams.get(MATERIALIZED_VIEW_FILTER_BY_FIELDS.name()); + if (filteringFieldsString != null) { + this.filterByFields = + parsePropertyStringToList(filteringFieldsString, MATERIALIZED_VIEW_FILTER_BY_FIELDS.name()); + } String projectionFieldsString = viewParams.get(MATERIALIZED_VIEW_PROJECTION_FIELDS.name()); if (projectionFieldsString != null) { - try { - this.projectionFields = ObjectMapperFactory.getInstance().readValue(projectionFieldsString, List.class); - } catch (JsonProcessingException e) { - throw new VeniceException("Failed to parse the provided projection fields: " + projectionFieldsString, e); - } + this.projectionFields = + parsePropertyStringToList(projectionFieldsString, MATERIALIZED_VIEW_PROJECTION_FIELDS.name()); } } @@ -83,6 +95,11 @@ public Builder setPartitionCount(int partitionCount) { return this; } + public Builder setFilterByFields(List filterByFields) { + this.filterByFields = filterByFields; + return this; + } + public Builder setProjectionFields(List projectionFields) { this.projectionFields = projectionFields; return this; @@ -100,16 +117,35 @@ public Map build() { if (partitionCount != null) { viewParams.put(MATERIALIZED_VIEW_PARTITION_COUNT.name(), partitionCount); } + if (!filterByFields.isEmpty()) { + viewParams.put( + MATERIALIZED_VIEW_FILTER_BY_FIELDS.name(), + convertListToStringProperty(filterByFields, MATERIALIZED_VIEW_FILTER_BY_FIELDS.name())); + } if (!projectionFields.isEmpty()) { - try { - viewParams.put( - MATERIALIZED_VIEW_PROJECTION_FIELDS.name(), - ObjectMapperFactory.getInstance().writeValueAsString(projectionFields)); - } catch (JsonProcessingException e) { - throw new VeniceException("Failed to convert the projection fields to a string property", e); - } + viewParams.put( + MATERIALIZED_VIEW_PROJECTION_FIELDS.name(), + convertListToStringProperty(projectionFields, MATERIALIZED_VIEW_PROJECTION_FIELDS.name())); } return viewParams; } } + + public static String convertListToStringProperty(List list, String propertyName) { + try { + return ObjectMapperFactory.getInstance().writeValueAsString(list); + } catch (JsonProcessingException e) { + throw new VeniceException("Failed to convert list to a string property for property: " + propertyName); + } + } + + public static List parsePropertyStringToList(String propertyString, String propertyName) { + try { + return ObjectMapperFactory.getInstance().readValue(propertyString, List.class); + } catch (JsonProcessingException e) { + throw new VeniceException( + "Failed to parse the property string to list for property: " + propertyName + ", property string: " + + propertyString); + } + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java index 7881473a3e0..056951b82aa 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java @@ -393,6 +393,11 @@ static String parseStoreFromKafkaTopicName(String kafkaTopic) { return parseStoreFromStreamReprocessingTopic(kafkaTopic); } else if (isVersionTopic(kafkaTopic)) { return parseStoreFromVersionTopic(kafkaTopic); + } else if (VeniceView.isMaterializedViewTopic(kafkaTopic)) { + // We have store view adapters for metadata repositories in view consumers (CC and DVC) that expects the store + // view name to be in certain format in order to differentiate and retrieve the correct metadata. These adapters + // are generic and can be leveraged by other view types too but as of now only materialized view is using it. + return VeniceView.parseStoreAndViewFromViewTopic(kafkaTopic); } else if (VeniceView.isViewTopic(kafkaTopic)) { return VeniceView.parseStoreFromViewTopic(kafkaTopic); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java index eabce510d9d..20c866ccb3e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java @@ -53,6 +53,13 @@ public ByteBuffer serializeNonChunkedKey(ByteBuffer key) { return serialize(key, serializedNonChunkKeySuffix); } + public byte[] extractKeyFromNonChunkedKeySuffix(byte[] keyBytesWithNonChunkedKeySuffix) { + int keyLength = keyBytesWithNonChunkedKeySuffix.length - serializedNonChunkKeySuffix.length; + byte[] keyBytes = new byte[keyLength]; + System.arraycopy(keyBytesWithNonChunkedKeySuffix, 0, keyBytes, 0, keyLength); + return keyBytes; + } + private ByteBuffer serialize(ByteBuffer key, byte[] encodedChunkedKeySuffix) { /** * Here will always allocate a new {@link ByteBuffer} to accommodate} the combination of key and chunked diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java index f82011349a6..9596aa1ae78 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java @@ -15,6 +15,7 @@ import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.writer.VeniceWriterOptions; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -29,6 +30,7 @@ public class MaterializedView extends VeniceView { private final int viewPartitionCount; private final PartitionerConfig partitionerConfig; private Lazy viewPartitioner; + private final List filterByFields; public MaterializedView(Properties props, String storeName, Map viewParameters) { super(props, storeName, viewParameters); @@ -46,6 +48,15 @@ public MaterializedView(Properties props, String storeName, Map Map viewPartitionerParamsMap = PartitionUtils.getPartitionerParamsMap(viewPartitionerParamsString); this.partitionerConfig = new PartitionerConfigImpl(viewPartitionerClass, viewPartitionerParamsMap, DEFAULT_AMP_FACTOR); + String filterByFieldsString = + viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_FILTER_BY_FIELDS.name()); + if (filterByFieldsString != null) { + filterByFields = MaterializedViewParameters.parsePropertyStringToList( + filterByFieldsString, + MaterializedViewParameters.MATERIALIZED_VIEW_FILTER_BY_FIELDS.name()); + } else { + filterByFields = Collections.emptyList(); + } } @Override @@ -148,4 +159,17 @@ public String getViewName() { public PartitionerConfig getPartitionerConfig() { return partitionerConfig; } + + public String getProjectionSchema() { + return viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_SCHEMA.name()); + } + + public boolean isValueProviderNeeded() { + return getViewPartitioner().getPartitionerType() == VenicePartitioner.VenicePartitionerType.COMPLEX + || getProjectionSchema() != null; + } + + public List getFilterByFields() { + return filterByFields; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java index b0debb798a9..02f78ea5bcd 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java @@ -32,6 +32,7 @@ public abstract class VeniceView { public static final String VIEW_STORE_PREFIX = "view_store_"; public static final String VIEW_NAME_SEPARATOR = "_"; + public static final String PROJECTION_SCHEMA_SUFFIX = "projection"; protected final Properties props; protected final String storeName; protected final Map viewParameters; @@ -105,6 +106,10 @@ public static String parseStoreFromViewTopic(String topicName) { return topicName.substring(0, Version.getLastIndexOfVersionSeparator(topicName)); } + public static boolean isMaterializedViewTopic(String topicName) { + return topicName.endsWith(MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX); + } + // TODO: see above TODO for isViewTopic function, same applies here public static int parseVersionFromViewTopic(String topicName) { if (!isViewTopic(topicName)) { @@ -170,4 +175,8 @@ public static String getStoreName(String storeName) { return storeName; } } + + public static String getProjectionSchemaName(String storeName, String viewName) { + return storeName + VIEW_NAME_SEPARATOR + viewName + PROJECTION_SCHEMA_SUFFIX; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java index 36f7c0a6b27..2bb3dc71999 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java @@ -5,19 +5,30 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.pubsub.api.PubSubMessageHeader; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; +import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; +import com.linkedin.venice.serializer.RecordSerializer; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.ReflectUtils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; +import com.linkedin.venice.writer.ComplexVeniceWriter; import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; public class ViewUtils { @@ -111,4 +122,103 @@ public static PubSubMessageHeader getViewDestinationPartitionHeader( throw new VeniceException("Failed to serialize view destination partition map", e); } } + + /** + * This uses {@link com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper} for field extraction. Therefor it + * has the same limitations as regular read compute where for Avro-1.9 or above we cannot guarantee the extracted + * field will be exactly the same as existing field in terms of default value. Since the extracted default value will + * be in Java format. This shouldn't be an issue for most use cases, but it's something worth to keep in mind about. + */ + public static String validateAndGenerateProjectionSchema( + Schema latestValueSchema, + Set projectionFields, + String generatedSchemaName) { + if (latestValueSchema == null) { + throw new IllegalArgumentException("Latest value schema cannot be null"); + } + List projectionSchemaFields = new LinkedList<>(); + + for (String fieldName: projectionFields) { + Schema.Field field = latestValueSchema.getField(fieldName); + if (field == null) { + throw new VeniceException("Field: " + fieldName + " does not exist in latest value schema"); + } + if (!field.hasDefaultValue()) { + throw new VeniceException("Default value is required for field: " + fieldName); + } + projectionSchemaFields.add(AvroCompatibilityHelper.newField(field).setDoc("").build()); + } + + Schema generatedProjectionSchema = + Schema.createRecord(generatedSchemaName, "", latestValueSchema.getNamespace(), false); + generatedProjectionSchema.setFields(projectionSchemaFields); + return generatedProjectionSchema.toString(); + } + + public static void validateFilterByFields(Schema latestValueSchema, Set filterByFields) { + if (latestValueSchema == null) { + throw new IllegalArgumentException("Latest value schema cannot be null"); + } + for (String fieldName: filterByFields) { + Schema.Field field = latestValueSchema.getField(fieldName); + if (field == null) { + throw new VeniceException("Field: " + fieldName + " does not exist in latest value schema"); + } + } + } + + public static void project(GenericRecord inputRecord, GenericRecord resultRecord) { + Schema.Field inputRecordField; + for (Schema.Field field: resultRecord.getSchema().getFields()) { + inputRecordField = inputRecord.getSchema().getField(field.name()); + if (inputRecordField != null) { + resultRecord.put(field.pos(), inputRecord.get(inputRecordField.pos())); + } + } + } + + /** + * @param oldValue of the record + * @param newValue of the record + * @param filterByFields to perform change filter on + * @return boolean to decide on the filter result. i.e. true is to keep and false is to be filtered . + */ + public static boolean changeFilter( + GenericRecord oldValue, + GenericRecord newValue, + List filterByFields, + String viewName) { + if (oldValue == null) { + return true; + } + if (newValue == null) { + throw new VeniceException("Cannot perform filter because new value is null for view: " + viewName); + } + boolean changed = false; + for (String fieldName: filterByFields) { + Schema fieldSchema = oldValue.getSchema().getField(fieldName).schema(); + if (GenericData.get().compare(oldValue.get(fieldName), newValue.get(fieldName), fieldSchema) != 0) { + changed = true; + break; + } + } + return changed; + } + + public static void configureWriterForProjection( + ComplexVeniceWriter complexVeniceWriter, + String viewName, + Lazy compressor, + String projectionSchemaString) { + Schema projectionSchema = new Schema.Parser().parse(projectionSchemaString); + Lazy> serializer = + Lazy.of(() -> FastSerializerDeserializerFactory.getFastAvroGenericSerializer(projectionSchema)); + complexVeniceWriter.configureWriterForProjection(projectionSchemaString, (projectionRecord) -> { + try { + return compressor.get().compress(serializer.get().serialize(projectionRecord)); + } catch (IOException e) { + throw new VeniceException("Projection failed due to compression error for view: " + viewName, e); + } + }); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java index f139b1fb531..a209368bc68 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java @@ -1,23 +1,23 @@ package com.linkedin.venice.writer; import com.linkedin.venice.exceptions.VeniceException; -import com.linkedin.venice.kafka.protocol.Put; -import com.linkedin.venice.kafka.protocol.enums.MessageType; -import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.partitioner.ComplexVenicePartitioner; import com.linkedin.venice.partitioner.VenicePartitioner; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; +import com.linkedin.venice.utils.AvroRecordUtils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.VeniceView; -import java.util.Set; +import com.linkedin.venice.views.ViewUtils; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -29,6 +29,8 @@ public class ComplexVeniceWriter extends VeniceWriter { private final ComplexVenicePartitioner complexPartitioner; private final String viewName; + private GenericRecord reusableProjectionRecord = null; + private Function projectionSerializerAndCompressor; public ComplexVeniceWriter( VeniceWriterOptions params, @@ -45,6 +47,14 @@ public ComplexVeniceWriter( VeniceView.getViewNameFromViewStoreName(VeniceView.parseStoreAndViewFromViewTopic(params.getTopicName())); } + public void configureWriterForProjection( + String projectionSchemaString, + Function projectionSerializerAndCompressor) { + Schema schema = new Schema.Parser().parse(projectionSchemaString); + this.reusableProjectionRecord = new GenericData.Record(schema); + this.projectionSerializerAndCompressor = projectionSerializerAndCompressor; + } + public CompletableFuture complexPut(K key, V value, int valueSchemaId, Lazy valueProvider) { return complexPut(key, value, valueSchemaId, valueProvider, null, null, null); } @@ -64,14 +74,26 @@ public CompletableFuture complexPut( PutMetadata putMetadata) { CompletableFuture finalCompletableFuture = new CompletableFuture<>(); if (value == null) { - // Ignore null value throw new VeniceException("Put value should not be null"); } else { + V finalValue; + if (reusableProjectionRecord != null) { + AvroRecordUtils.clearRecord(reusableProjectionRecord); + GenericRecord record = valueProvider.get(); + if (record == null) { + throw new VeniceException("Projection failed due to unexpected null value for view: " + getViewName()); + } + ViewUtils.project(record, reusableProjectionRecord); + finalValue = projectionSerializerAndCompressor.apply(reusableProjectionRecord); + } else { + // No projection + finalValue = value; + } // Write updated/put record to materialized view topic partition(s) byte[] serializedKey = keySerializer.serialize(topicName, key); if (complexPartitioner == null) { // No VeniceComplexPartitioner involved, perform simple put. - byte[] serializedValue = valueSerializer.serialize(topicName, value); + byte[] serializedValue = valueSerializer.serialize(topicName, finalValue); int partition = getPartition(serializedKey); propagateVeniceWriterFuture( put(serializedKey, serializedValue, partition, valueSchemaId, callback, putMetadata), @@ -84,7 +106,7 @@ public CompletableFuture complexPut( if (partitions.length == 0) { finalCompletableFuture.complete(null); } else { - byte[] serializedValue = valueSerializer.serialize(topicName, value); + byte[] serializedValue = valueSerializer.serialize(topicName, finalValue); performMultiPartitionAction( partitions, finalCompletableFuture, @@ -98,38 +120,6 @@ public CompletableFuture complexPut( return finalCompletableFuture; } - /** - * Used during NR pass-through in remote region to forward records or chunks of records to corresponding view - * partition based on provided view partition map. This way the producing leader don't need to worry about large - * record assembly or chunking for view topic(s) when ingesting from source VT during NR pass-through. It's also - * expected to receive an empty partition set and in which case it's a no-op and we simply return a completed future. - * This is a valid use case since certain complex partitioner implementation could filter out records based on value - * fields and return an empty partition. - */ - public CompletableFuture forwardPut(K key, V value, int valueSchemaId, Set partitions) { - if (partitions.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - byte[] serializedKey = keySerializer.serialize(topicName, key); - byte[] serializedValue = valueSerializer.serialize(topicName, value); - KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, serializedKey); - Put putPayload = buildPutPayload(serializedValue, valueSchemaId, null); - int[] partitionArray = partitions.stream().mapToInt(i -> i).toArray(); - CompletableFuture finalCompletableFuture = new CompletableFuture<>(); - performMultiPartitionAction( - partitionArray, - finalCompletableFuture, - (partition) -> sendMessage( - producerMetadata -> kafkaKey, - MessageType.PUT, - putPayload, - partition, - null, - DEFAULT_LEADER_METADATA_WRAPPER, - APP_DEFAULT_LOGICAL_TS)); - return finalCompletableFuture; - } - /** * Perform "delete" on the given key. If a {@link ComplexVenicePartitioner} is involved then it will be a best effort * attempt to delete the record using the valueProvider. It's best effort because: diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java index a8ec09d79af..7cd230e7cfc 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java @@ -37,6 +37,7 @@ import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.integration.utils.D2TestUtils; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; @@ -74,11 +75,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; import org.apache.samza.system.SystemProducer; import org.testng.Assert; @@ -178,11 +181,15 @@ public void testLFIngestionWithMaterializedView() throws IOException { TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); rePushProps.setProperty(SOURCE_KAFKA, "true"); rePushProps.setProperty(KAFKA_INPUT_BROKER_URL, childDatacenters.get(0).getPubSubBrokerWrapper().getAddress()); - TestWriteUtils.runPushJob("Run push job", rePushProps); + TestWriteUtils.runPushJob("Run re-push job", rePushProps); String rePushViewTopicName = Version.composeKafkaTopic(storeName, 2) + VIEW_NAME_SEPARATOR + testViewName + MATERIALIZED_VIEW_TOPIC_SUFFIX; String rePushVersionTopicName = Version.composeKafkaTopic(storeName, 2); validateViewTopicAndVersionTopic(rePushViewTopicName, rePushVersionTopicName, 6, 3, 100); + + // Perform another push v3 to ensure view resources for v1 are cleaned up + TestWriteUtils.runPushJob("Run push job v3", props); + verifyVersionAndViewTopicDeletion(viewTopicName, versionTopicName); } } @@ -224,13 +231,7 @@ public void testBatchOnlyMaterializedViewDVCConsumer() throws IOException, Execu TestWriteUtils.runPushJob("Run push job", props); // Start a DVC client that's subscribed to partition 0 of the store's materialized view. The DVC client should // contain all data records. - VeniceProperties backendConfig = - new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) - .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) - .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) - .put(ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES, 2 * 1024 * 1024L) - .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) - .build(); + VeniceProperties backendConfig = getBasicBackendConfigForDVC(); DaVinciConfig daVinciConfig = new DaVinciConfig(); // Use non-source fabric region to also verify NR for materialized view. D2Client daVinciD2RemoteFabric = D2TestUtils @@ -285,13 +286,7 @@ public void testBatchOnlyMaterializedViewDVCConsumer() throws IOException, Execu D2ClientUtils.shutdownClient(daVinciD2RemoteFabric); } // Make sure things work in the source fabric too. - VeniceProperties newBackendConfig = - new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) - .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) - .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) - .put(ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES, 2 * 1024 * 1024L) - .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) - .build(); + VeniceProperties newBackendConfig = getBasicBackendConfigForDVC(); D2Client daVinciD2SourceFabric = D2TestUtils .getAndStartD2Client(multiRegionMultiClusterWrapper.getChildRegions().get(1).getZkServerWrapper().getAddress()); try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( @@ -353,13 +348,7 @@ public void testLargeValuePushMaterializedViewDVCConsumer() TestWriteUtils.runPushJob("Run push job", props); // Start a DVC client that's subscribed to all partitions of the store's materialized view. The DVC client should // contain all data records. - VeniceProperties backendConfig = - new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) - .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) - .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) - .put(ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES, 2 * 1024 * 1024L) - .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) - .build(); + VeniceProperties backendConfig = getBasicBackendConfigForDVC(); DaVinciConfig daVinciConfig = new DaVinciConfig(); // Use non-source fabric region to also verify NR for materialized view and chunks forwarding. D2Client daVinciD2RemoteFabric = D2TestUtils @@ -452,15 +441,15 @@ public void testLargeValuePushMaterializedViewCCConsumer() polledChangeEvents.put(key, afterImageEvent); } Assert.assertEquals(polledChangeEvents.size(), numberOfRecords); + viewTopicConsumer.close(); } /** - * Verification of the produced records is difficult because we don't really support complex partitioner in the - * read path. Once CC with views is supported we should use CC to verify. Perform re-push to ensure we can deserialize - * value properly during re-push. + * Perform re-push to ensure we can deserialize value properly during re-push. */ @Test(timeOut = TEST_TIMEOUT) - public void testMaterializedViewWithComplexPartitioner() throws IOException { + public void testMaterializedViewWithComplexPartitioner() + throws IOException, ExecutionException, InterruptedException { File inputDir = getTempDataDirectory(); Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV2Schema(inputDir); String inputDirPath = "file:" + inputDir.getAbsolutePath(); @@ -501,18 +490,33 @@ public void testMaterializedViewWithComplexPartitioner() throws IOException { }); } TestWriteUtils.runPushJob("Run push job", props); - String viewTopicName = - Version.composeKafkaTopic(storeName, 1) + VIEW_NAME_SEPARATOR + testViewName + MATERIALIZED_VIEW_TOPIC_SUFFIX; - // View topic partitions should be mostly empty based on the TestValueBasedPartitioner logic. - int expectedMaxEndOffset = 6; // This may change when we introduce more CMs e.g. heartbeats - for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) { - VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin(); - PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName); - Int2LongMap viewTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(viewPubSubTopic); - for (long endOffset: viewTopicOffsetMap.values()) { - Assert.assertTrue(endOffset <= expectedMaxEndOffset); - } - } + + // View topic partitions should be empty based on the TestValueBasedPartitioner logic. Start a CC consumer in source + // region to verify. + Properties consumerProperties = new Properties(); + consumerProperties.put( + KAFKA_BOOTSTRAP_SERVERS, + multiRegionMultiClusterWrapper.getChildRegions().get(0).getPubSubBrokerWrapper().getAddress()); + ChangelogClientConfig viewChangeLogClientConfig = getChangelogClientConfigForView( + testViewName, + consumerProperties, + multiRegionMultiClusterWrapper.getChildRegions().get(0).getZkServerWrapper().getAddress()); + VeniceChangelogConsumerClientFactory veniceViewChangelogConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(viewChangeLogClientConfig, new MetricsRepository()); + VeniceChangelogConsumer viewTopicConsumer0 = + veniceViewChangelogConsumerClientFactory.getChangelogConsumer(storeName, "partition0"); + VeniceChangelogConsumer viewTopicConsumer1 = + veniceViewChangelogConsumerClientFactory.getChangelogConsumer(storeName, "partition1"); + Assert.assertTrue(viewTopicConsumer0 instanceof VeniceAfterImageConsumerImpl); + Assert.assertTrue(viewTopicConsumer1 instanceof VeniceAfterImageConsumerImpl); + viewTopicConsumer0.subscribe(Collections.singleton(0)).get(); + viewTopicConsumer1.subscribe(Collections.singleton(1)).get(); + Collection, VeniceChangeCoordinate>> pubSubMessages = + viewTopicConsumer0.poll(1000); + Assert.assertEquals(pubSubMessages.size(), 0); + pubSubMessages = viewTopicConsumer1.poll(1000); + Assert.assertEquals(pubSubMessages.size(), 0); + // Perform some partial updates in the non-NR source fabric VeniceClusterWrapper veniceCluster = childDatacenters.get(1).getClusters().get(clusterName); SystemProducer producer = @@ -526,25 +530,355 @@ public void testMaterializedViewWithComplexPartitioner() throws IOException { updateBuilder.setNewFieldValue("age", i); IntegrationTestPushUtils.sendStreamingRecord(producer, storeName, key, updateBuilder.build(), newTimestamp); } + // age 1-9 will be written to all partitions so +9 in p0 and p1 // age 10-19 will be written to % numPartitions which will alternate so +5 in p0 and p1 - int newMinEndOffset = expectedMaxEndOffset + 9 + 5; - for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) { - VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin(); - PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName); - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { - Int2LongMap viewTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(viewPubSubTopic); - for (long endOffset: viewTopicOffsetMap.values()) { - Assert.assertTrue(endOffset >= newMinEndOffset); - } - }); + int expectedEventCount = 9 + 5; + Map polledEvents0 = new HashMap<>(); + Map polledEvents1 = new HashMap<>(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + for (PubSubMessage, VeniceChangeCoordinate> pubSubMessage: viewTopicConsumer0 + .poll(1000)) { + polledEvents0.put(pubSubMessage.getKey().toString(), pubSubMessage.getValue().getCurrentValue()); + } + for (PubSubMessage, VeniceChangeCoordinate> pubSubMessage: viewTopicConsumer1 + .poll(1000)) { + polledEvents1.put(pubSubMessage.getKey().toString(), pubSubMessage.getValue().getCurrentValue()); + } + Assert.assertEquals(polledEvents0.size(), expectedEventCount); + Assert.assertEquals(polledEvents1.size(), expectedEventCount); + }); + for (int i = 1; i < 10; i++) { + // These records should exist in both partitions + Assert.assertTrue(polledEvents0.containsKey(Integer.toString(i))); + Assert.assertTrue(polledEvents1.containsKey(Integer.toString(i))); + } + for (int i = 10; i < 20; i++) { + // Alternate between p0 and p1 + if (i % 2 == 0) { + Assert.assertTrue(polledEvents0.containsKey(Integer.toString(i))); + } else { + Assert.assertTrue(polledEvents1.containsKey(Integer.toString(i))); + } } + // A re-push should succeed Properties rePushProps = TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); rePushProps.setProperty(SOURCE_KAFKA, "true"); rePushProps.setProperty(KAFKA_INPUT_BROKER_URL, childDatacenters.get(0).getPubSubBrokerWrapper().getAddress()); TestWriteUtils.runPushJob("Run push job", rePushProps); + + viewTopicConsumer0.close(); + viewTopicConsumer1.close(); + } + + /** + * Test materialized view with projection using change log and DVC consumer. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testMaterializedViewWithProjectionUsingCCAndDVCConsumer() + throws IOException, ExecutionException, InterruptedException { + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV2Schema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("complexPartitionStore"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(false) + .setChunkingEnabled(true) + .setCompressionStrategy(CompressionStrategy.GZIP) + .setRmdChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setNativeReplicationSourceFabric(childDatacenters.get(0).getRegionName()) + .setPartitionCount(3) + .setActiveActiveReplicationEnabled(true) + .setWriteComputationEnabled(true) + .setHybridRewindSeconds(10L) + .setHybridOffsetLagThreshold(2L); + String testViewName = "projectionView"; + String projectionFieldName = "firstName"; + try (ControllerClient controllerClient = + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { + MaterializedViewParameters.Builder viewParamBuilder = + new MaterializedViewParameters.Builder(testViewName).setPartitionCount(1) + .setProjectionFields(Collections.singletonList(projectionFieldName)); + UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams(viewParamBuilder.build()); + controllerClient + .retryableRequest(5, controllerClient1 -> controllerClient.updateStore(storeName, updateViewParam)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, () -> { + Map viewConfigMap = controllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertEquals( + viewConfigMap.get(testViewName).getViewClassName(), + MaterializedView.class.getCanonicalName()); + }); + } + TestWriteUtils.runPushJob("Run push job", props); + + // Verify projection with change log consumer in all regions + String firstNamePrefix = "first_name_"; + for (VeniceMultiClusterWrapper region: multiRegionMultiClusterWrapper.getChildRegions()) { + Map polledEvents = getChangeLogConsumerAndPollEvents( + storeName, + testViewName, + region.getPubSubBrokerWrapper().getAddress(), + region.getZkServerWrapper().getAddress(), + 100); + // Check the projection result + for (int i = 1; i <= 100; i++) { + GenericRecord record = polledEvents.get(Integer.toString(i)); + Assert.assertNotNull(record); + Assert.assertEquals(record.get(projectionFieldName).toString(), firstNamePrefix + i); + Assert.assertEquals(record.getSchema().getFields().size(), 1); + } + } + + // Verify projection with DVC in all regions + for (VeniceMultiClusterWrapper region: multiRegionMultiClusterWrapper.getChildRegions()) { + VeniceProperties backendConfig = getBasicBackendConfigForDVC(); + DaVinciConfig daVinciConfig = new DaVinciConfig(); + D2Client daVinciD2RemoteFabric = D2TestUtils.getAndStartD2Client(region.getZkServerWrapper().getAddress()); + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + daVinciD2RemoteFabric, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + new MetricsRepository(), + backendConfig)) { + DaVinciClient viewClient = + factory.getAndStartGenericAvroClient(storeName, testViewName, daVinciConfig); + viewClient.subscribeAll().get(); + for (int i = 1; i <= 100; i++) { + GenericRecord record = (GenericRecord) viewClient.get(Integer.toString(i)).get(); + Assert.assertNotNull(record); + Assert.assertEquals(record.get(projectionFieldName).toString(), firstNamePrefix + i); + Assert.assertEquals(record.getSchema().getFields().size(), 1); + } + } finally { + D2ClientUtils.shutdownClient(daVinciD2RemoteFabric); + } + } + } + + @Test(timeOut = TEST_TIMEOUT) + public void testMaterializedViewWithFilterAndProjectionUsingCCConsumer() + throws IOException, ExecutionException, InterruptedException { + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV2Schema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("complexPartitionStore"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + // Use an A/A W/C enabled store to verify correct partitioning after partial update is applied. + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(false) + .setChunkingEnabled(true) + .setCompressionStrategy(CompressionStrategy.GZIP) + .setRmdChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setNativeReplicationSourceFabric(childDatacenters.get(0).getRegionName()) + .setPartitionCount(3) + .setActiveActiveReplicationEnabled(true) + .setWriteComputationEnabled(true) + .setHybridRewindSeconds(10L) + .setHybridOffsetLagThreshold(2L); + String testViewName = "filterProjectionView"; + String projectionFieldName = "firstName"; + String filterByFieldName = "age"; + try (ControllerClient controllerClient = + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { + MaterializedViewParameters.Builder viewParamBuilder = + new MaterializedViewParameters.Builder(testViewName).setPartitionCount(1) + .setProjectionFields(Collections.singletonList(projectionFieldName)) + .setFilterByFields(Collections.singletonList(filterByFieldName)); + UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams(viewParamBuilder.build()); + controllerClient + .retryableRequest(5, controllerClient1 -> controllerClient.updateStore(storeName, updateViewParam)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, () -> { + Map viewConfigMap = controllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertEquals( + viewConfigMap.get(testViewName).getViewClassName(), + MaterializedView.class.getCanonicalName()); + }); + } + TestWriteUtils.runPushJob("Run push job", props); + // Verify result with change log consumer in all regions + String firstNamePrefix = "first_name_"; + Map> consumerMap = new HashMap<>(); + for (VeniceMultiClusterWrapper region: multiRegionMultiClusterWrapper.getChildRegions()) { + VeniceChangelogConsumer consumer = getChangeLogConsumer( + storeName, + testViewName, + region.getPubSubBrokerWrapper().getAddress(), + region.getZkServerWrapper().getAddress()); + consumer.subscribeAll().get(); + consumerMap.put(region.getRegionName(), consumer); + Map polledEvents = pollEvents(consumer, 100); + // Check the projection and filter results. Nothing should be filtered from the batch push. + for (int i = 1; i <= 100; i++) { + GenericRecord record = polledEvents.get(Integer.toString(i)); + Assert.assertNotNull(record); + Assert.assertEquals(record.getSchema().getFields().size(), 2); + Assert.assertEquals(record.get(projectionFieldName).toString(), firstNamePrefix + i); + Assert.assertEquals(record.get(filterByFieldName), -1); + } + } + + // Perform some partial updates in the non-NR source fabric + String updatedFirstNamePrefix = "updated_first_name_"; + VeniceClusterWrapper veniceCluster = childDatacenters.get(1).getClusters().get(clusterName); + SystemProducer producer = + IntegrationTestPushUtils.getSamzaProducer(veniceCluster, storeName, Version.PushType.STREAM); + Schema partialUpdateSchema = WriteComputeSchemaConverter.getInstance() + .convertFromValueRecordSchema(recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema()); + long newTimestamp = 100000L; + int expectedFilteredEvents = 0; + for (int i = 1; i < 20; i++) { + String key = Integer.toString(i); + UpdateBuilder updateBuilder = new UpdateBuilderImpl(partialUpdateSchema); + if (i % 2 == 0) { + // only update age for some keys + updateBuilder.setNewFieldValue(filterByFieldName, i); + expectedFilteredEvents++; + } + updateBuilder.setNewFieldValue(projectionFieldName, updatedFirstNamePrefix + i); + IntegrationTestPushUtils.sendStreamingRecord(producer, storeName, key, updateBuilder.build(), newTimestamp); + } + for (VeniceChangelogConsumer consumer: consumerMap.values()) { + Map polledEvents = pollEvents(consumer, expectedFilteredEvents); + for (int i = 1; i < 20; i++) { + if (i % 2 == 0) { + GenericRecord record = polledEvents.get(Integer.toString(i)); + Assert.assertNotNull(record); + Assert.assertEquals(record.getSchema().getFields().size(), 2); + Assert.assertEquals(record.get(projectionFieldName).toString(), updatedFirstNamePrefix + i); + Assert.assertEquals(record.get(filterByFieldName), i); + } + } + consumer.close(); + } + } + + @Test(timeOut = TEST_TIMEOUT) + public void testFailedMaterializedViewPushCanBeCleanedUp() throws IOException { + // Create a store with materialized view and start a batch push + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("batchStore"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(false) + .setChunkingEnabled(true) + .setRmdChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setNativeReplicationSourceFabric(childDatacenters.get(0).getRegionName()) + .setPartitionCount(3); + String testViewName = "MaterializedViewTest"; + try (ControllerClient controllerClient = + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { + MaterializedViewParameters.Builder viewParamBuilder = + new MaterializedViewParameters.Builder(testViewName).setPartitionCount(1); + UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams(viewParamBuilder.build()); + controllerClient + .retryableRequest(5, controllerClient1 -> controllerClient.updateStore(storeName, updateViewParam)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, () -> { + Map viewConfigMap = controllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertEquals( + viewConfigMap.get(testViewName).getViewClassName(), + MaterializedView.class.getCanonicalName()); + }); + VersionCreationResponse versionCreationResponse = controllerClient.requestTopicForWrites( + storeName, + 1000, + Version.PushType.BATCH, + "controller-client-started-test-push", + true, + true, + false, + Optional.empty(), + Optional.empty(), + Optional.empty(), + false, + -1); + Assert.assertFalse(versionCreationResponse.isError()); + int newVersion = versionCreationResponse.getVersion(); + String versionTopicName = Version.composeKafkaTopic(storeName, newVersion); + String viewTopicName = Version.composeKafkaTopic(storeName, newVersion) + VIEW_NAME_SEPARATOR + testViewName + + MATERIALIZED_VIEW_TOPIC_SUFFIX; + // Wait and verify the resources are created + for (VeniceMultiClusterWrapper veniceMultiClusterWrapper: childDatacenters) { + VeniceHelixAdmin admin = veniceMultiClusterWrapper.getRandomController().getVeniceHelixAdmin(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + Assert.assertTrue(admin.getStore(clusterName, storeName).containsVersion(newVersion)); + Assert.assertTrue( + admin.getTopicManager().containsTopic(admin.getPubSubTopicRepository().getTopic(versionTopicName))); + Assert.assertTrue( + admin.getTopicManager().containsTopic(admin.getPubSubTopicRepository().getTopic(viewTopicName))); + }); + } + // Kill the push job and ensure resources are cleaned up + controllerClient.killOfflinePushJob(versionTopicName); + verifyVersionAndViewTopicDeletion(viewTopicName, versionTopicName); + } + } + + private Map getChangeLogConsumerAndPollEvents( + String storeName, + String viewName, + String pubSubBrokerAddress, + String d2ZkAddress, + int expectedEventsCount) throws ExecutionException, InterruptedException { + VeniceChangelogConsumer viewTopicConsumer = null; + try { + viewTopicConsumer = getChangeLogConsumer(storeName, viewName, pubSubBrokerAddress, d2ZkAddress); + viewTopicConsumer.subscribeAll().get(); + return pollEvents(viewTopicConsumer, expectedEventsCount); + } finally { + if (viewTopicConsumer != null) { + viewTopicConsumer.close(); + } + } + } + + private VeniceChangelogConsumer getChangeLogConsumer( + String storeName, + String viewName, + String pubSubBrokerAddress, + String d2ZkAddress) { + Properties consumerProperties = new Properties(); + consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, pubSubBrokerAddress); + ChangelogClientConfig viewChangeLogClientConfig = + getChangelogClientConfigForView(viewName, consumerProperties, d2ZkAddress); + VeniceChangelogConsumerClientFactory veniceViewChangelogConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(viewChangeLogClientConfig, new MetricsRepository()); + return veniceViewChangelogConsumerClientFactory.getChangelogConsumer(storeName); + } + + private Map pollEvents( + VeniceChangelogConsumer viewTopicConsumer, + int expectedEventsCount) { + Map polledEvents = new HashMap<>(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + for (PubSubMessage, VeniceChangeCoordinate> pubSubMessage: viewTopicConsumer + .poll(1000)) { + polledEvents.put(pubSubMessage.getKey().toString(), pubSubMessage.getValue().getCurrentValue()); + } + Assert.assertEquals(polledEvents.size(), expectedEventsCount); + }); + return polledEvents; } private double getMetric(MetricsRepository metricsRepository, String metricName, String storeName) { @@ -584,4 +918,46 @@ private void validateViewTopicAndVersionTopic( Assert.assertTrue(records > minRecordCount, "View topic records size: " + records); } } + + private void verifyVersionAndViewTopicDeletion(String viewTopicName, String versionTopicName) { + String storeName = Version.parseStoreFromVersionTopic(versionTopicName); + int versionNumber = Version.parseVersionFromVersionTopicName(versionTopicName); + for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) { + VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + Assert.assertFalse(admin.getStore(clusterName, storeName).containsVersion(versionNumber)); + }); + PubSubTopic versionPubSubTopic = admin.getPubSubTopicRepository().getTopic(versionTopicName); + if (admin.getTopicManager().containsTopic(versionPubSubTopic)) { + Assert.assertTrue(admin.isTopicTruncated(versionTopicName)); + } + PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName); + if (admin.getTopicManager().containsTopic(viewPubSubTopic)) { + Assert.assertTrue(admin.isTopicTruncated(viewTopicName)); + } + } + } + + private ChangelogClientConfig getChangelogClientConfigForView( + String viewName, + Properties consumerProperties, + String localD2ZkHost) { + return new ChangelogClientConfig().setViewName(viewName) + .setConsumerProperties(consumerProperties) + .setControllerD2ServiceName(D2_SERVICE_NAME) + .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .setLocalD2ZkHosts(localD2ZkHost) + .setVersionSwapDetectionIntervalTimeInSeconds(3L) + .setControllerRequestRetryCount(3) + .setBootstrapFileSystemPath(getTempDataDirectory().getAbsolutePath()); + } + + private VeniceProperties getBasicBackendConfigForDVC() { + return new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) + .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) + .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES, 2 * 1024 * 1024L) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .build(); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java index b6e18d2e93b..0c91d9c35b6 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java @@ -13,7 +13,6 @@ import com.linkedin.venice.utils.lazy.Lazy; import java.nio.ByteBuffer; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -40,7 +39,8 @@ public CompletableFuture processRecord( int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord, - Lazy valueProvider) { + Lazy valueProvider, + Lazy oldValueProvider) { internalView.incrementRecordCount(storeName); return CompletableFuture.completedFuture(null); @@ -51,8 +51,8 @@ public CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - Set viewPartitionSet, - Lazy newValueProvider) { + Lazy newValueProvider, + Lazy oldValueProvider) { internalView.incrementRecordCount(storeName); return CompletableFuture.completedFuture(null); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 29a7462c922..cfe8fc50ba7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -5765,20 +5765,23 @@ static Map mergeNewViewConfigsIntoOldConfigs( static Map addNewViewConfigsIntoOldConfigs( Store oldStore, - String viewClass, + String viewName, ViewConfig viewConfig) throws VeniceException { - // Add new view config into the existing config map. The new configs will override existing ones which share the - // same key. + // Add new view config into the existing config map. For the safety of downstream consumers we do not allow in place + // updates to an existing view. Updates should be made by creating a new view and removing the old view. Map oldViewConfigMap = oldStore.getViewConfigs(); if (oldViewConfigMap == null) { oldViewConfigMap = new HashMap<>(); + } else if (oldViewConfigMap.containsKey(viewName)) { + throw new VeniceException( + "We do not support in place update of view configs for the safety of downstream consumers"); } Map mergedConfigs = StoreViewUtils.convertViewConfigMapToStoreViewRecordMap(oldViewConfigMap); StoreViewConfigRecord newStoreViewConfigRecord = StoreViewUtils.convertViewConfigToStoreViewConfigRecord(viewConfig); - mergedConfigs.put(viewClass, newStoreViewConfigRecord); + mergedConfigs.put(viewName, newStoreViewConfigRecord); return mergedConfigs; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 59cd1f22492..e9f4e424d3c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -191,6 +191,7 @@ import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.PartitionerConfig; +import com.linkedin.venice.meta.ReadWriteSchemaRepository; import com.linkedin.venice.meta.ReadWriteStoreRepository; import com.linkedin.venice.meta.RegionPushDetails; import com.linkedin.venice.meta.RoutersClusterConfig; @@ -244,6 +245,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; @@ -2423,7 +2425,8 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa } // If View parameter is not provided, use emtpy map instead. It does not inherit from existing config. ViewConfig viewConfig = new ViewConfigImpl(viewClassName.get(), viewParams.orElse(Collections.emptyMap())); - ViewConfig validatedViewConfig = validateAndDecorateStoreViewConfig(currStore, viewConfig, viewName.get()); + ViewConfig validatedViewConfig = + validateAndDecorateStoreViewConfig(clusterName, currStore, viewConfig, viewName.get()); updatedViewSettings = VeniceHelixAdmin.addNewViewConfigsIntoOldConfigs(currStore, viewName.get(), validatedViewConfig); } else { @@ -2436,7 +2439,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa if (storeViewConfig.isPresent()) { // Validate and overwrite store views if they're getting set Map validatedViewConfigs = - validateAndDecorateStoreViewConfigs(storeViewConfig.get(), currStore); + validateAndDecorateStoreViewConfigs(clusterName, storeViewConfig.get(), currStore); setStore.views = StoreViewUtils.convertViewConfigMapToStoreViewRecordMap(validatedViewConfigs); updatedConfigsList.add(STORE_VIEW); } @@ -2911,18 +2914,25 @@ && getVeniceHelixAdmin().isHybrid(setStore.getHybridStoreConfig()) && setStore.g } } - private Map validateAndDecorateStoreViewConfigs(Map stringMap, Store store) { + private Map validateAndDecorateStoreViewConfigs( + String clusterName, + Map stringMap, + Store store) { Map configs = StoreViewUtils.convertStringMapViewToViewConfigMap(stringMap); Map validatedConfigs = new HashMap<>(); for (Map.Entry viewConfigEntry: configs.entrySet()) { ViewConfig validatedViewConfig = - validateAndDecorateStoreViewConfig(store, viewConfigEntry.getValue(), viewConfigEntry.getKey()); + validateAndDecorateStoreViewConfig(clusterName, store, viewConfigEntry.getValue(), viewConfigEntry.getKey()); validatedConfigs.put(viewConfigEntry.getKey(), validatedViewConfig); } return validatedConfigs; } - private ViewConfig validateAndDecorateStoreViewConfig(Store store, ViewConfig viewConfig, String viewName) { + private ViewConfig validateAndDecorateStoreViewConfig( + String clusterName, + Store store, + ViewConfig viewConfig, + String viewName) { // TODO: Pass a proper properties object here. Today this isn't used in this context if (viewConfig.getViewClassName().equals(MaterializedView.class.getCanonicalName())) { if (viewName.contains(VERSION_SEPARATOR)) { @@ -2944,7 +2954,46 @@ private ViewConfig validateAndDecorateStoreViewConfig(Store store, ViewConfig vi if (!viewParams.containsKey(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())) { decoratedViewParamBuilder.setPartitionCount(store.getPartitionCount()); } - viewConfig.setViewParameters(decoratedViewParamBuilder.build()); + viewParams = decoratedViewParamBuilder.build(); + + // Validate and decorate filter by and projection fields + Lazy valueSchema = Lazy.of(() -> getSupersetOrLatestValueSchema(clusterName, store.getName())); + Set filterByFields = new HashSet<>(); + Set projectionFields = new HashSet<>(); + if (viewParams.containsKey(MaterializedViewParameters.MATERIALIZED_VIEW_FILTER_BY_FIELDS.name())) { + String filterByFieldsString = + viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_FILTER_BY_FIELDS.name()); + filterByFields.addAll( + MaterializedViewParameters.parsePropertyStringToList( + filterByFieldsString, + MaterializedViewParameters.MATERIALIZED_VIEW_FILTER_BY_FIELDS.name())); + ViewUtils.validateFilterByFields(valueSchema.get(), filterByFields); + } + if (viewParams.containsKey(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name())) { + String projectionFieldsString = + viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name()); + projectionFields.addAll( + MaterializedViewParameters.parsePropertyStringToList( + projectionFieldsString, + MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name())); + if (projectionFields.isEmpty()) { + // Enabling projection with empty projection fields should be flagged as error. + throw new VeniceException("Projection fields cannot be set to an empty list"); + } + // If projection and filtering are both enabled we would like to include the filtering fields as part of + // the projection fields. + if (!filterByFields.isEmpty()) { + projectionFields.addAll(filterByFields); + } + } + if (!projectionFields.isEmpty()) { + String projectionSchemaString = ViewUtils.validateAndGenerateProjectionSchema( + valueSchema.get(), + projectionFields, + VeniceView.getProjectionSchemaName(store.getName(), viewName)); + viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_SCHEMA.name(), projectionSchemaString); + } + viewConfig.setViewParameters(viewParams); } VeniceView view = ViewUtils.getVeniceView( viewConfig.getViewClassName(), @@ -2955,6 +3004,12 @@ private ViewConfig validateAndDecorateStoreViewConfig(Store store, ViewConfig vi return viewConfig; } + private Schema getSupersetOrLatestValueSchema(String clusterName, String storeName) { + getVeniceHelixAdmin().checkControllerLeadershipFor(clusterName); + ReadWriteSchemaRepository schemaRepo = getHelixVeniceClusterResources(clusterName).getSchemaRepository(); + return schemaRepo.getSupersetOrLatestValueSchema(storeName).getSchema(); + } + private SupersetSchemaGenerator getSupersetSchemaGenerator(String clusterName) { if (externalSupersetSchemaGenerator.isPresent() && getMultiClusterConfigs().getControllerConfig(clusterName) .isParentExternalSupersetSchemaGenerationEnabled()) {