From 4eb2f547f244053caaa0367ecf2fb6c29e6d9aa6 Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Sun, 23 Mar 2025 20:39:21 -0700 Subject: [PATCH 1/2] [server][common][controller][vpj] Materialized view projection and filter support Add projection and filtering support for materialized view (MV) to be more efficient about unwanted data for view consumers. Projection can be enabled by setting projection fields in the materialized view parameters. Similarly filtering can be enabled by setting filter by fields. These two features can be enabled separately or together. If enabled together the filter by fields will be included in the projecting fields automatically. Here is an example MV configuration to illustrate the ideas: Record containing fields: {a, b, c, d, e} Projecting fields: {b, c} Filtering fields: {a} The only filtering option for now is to skip if none of the filter by fields changed. The filtering is also only applied during hybrid ingestion since it doesn't make sense to have a change filter on batch push. With the above setup we will project and write all batch data to the MV ({a, b, c}). RT updates (full PUT or UPDATE) will project and write the resulting record to the MV ({a, b, c}) only if the value of field (a) is different from the old value. All DELETE events will be written to the MV (no filtering). In order to achieve the above behavior there are several changes: 1. Previously we've used pub sub message headers to perform forwarding to handle chunks during NR pass-through in remote regions. This strategy will not work with projection because in order for us to perform projection on batch data in remote regions, we will need the remote partition leaders to assemble the chunks during NR pass-through. We are replacing the forwarding strategy with InMemoryChunkAssembler. To ensure leaders don't resume in-between chunks we will also buffer and delay writing the chunks to drainer until we have a fully assembled record and produced it to view topic(s). 2. Added enforcement in controller to ensure view configs are immutable. Projection schema is generated when adding a new materialized view and stored with the view config. Since there can only be one schema version per view, the znode size should be manageable with compression. If this becomes a concern we can also store it separately or generate it on the fly. We also verify the filtering by fields and projection fields to ensure they exist in latest superset or value schema and have default values. 3. Projection is performed in ComplexVeniceWriter as part of complexPut so both VPJ and leaders can use the same code for projection. Filtering is performed in MaterializedViewWriter since current offering of change filter is applicable only to hyrbid writes. --- .../consumer/VeniceChangelogConsumerImpl.java | 9 +- .../ActiveActiveStoreIngestionTask.java | 21 +- .../LeaderFollowerStoreIngestionTask.java | 173 +++++--- .../consumer/MergeConflictResultWrapper.java | 29 +- .../consumer/PartitionConsumptionState.java | 13 + .../PubSubMessageProcessedResultWrapper.java | 9 + .../kafka/consumer/StoreIngestionTask.java | 20 +- .../consumer/WriteComputeResultWrapper.java | 20 +- .../NativeMetadataRepositoryViewAdapter.java | 28 ++ .../store/view/ChangeCaptureViewWriter.java | 8 +- .../store/view/MaterializedViewWriter.java | 65 ++- .../davinci/store/view/VeniceViewWriter.java | 21 +- .../LeaderFollowerStoreIngestionTaskTest.java | 9 +- .../view/ChangeCaptureViewWriterTest.java | 30 +- .../view/MaterializedViewWriterTest.java | 41 -- .../datawriter/AbstractPartitionWriter.java | 21 +- .../datawriter/CompositeVeniceWriter.java | 4 +- .../meta/MaterializedViewParameters.java | 66 ++- .../com/linkedin/venice/meta/Version.java | 5 + .../KeyWithChunkingSuffixSerializer.java | 7 + .../venice/views/MaterializedView.java | 24 ++ .../com/linkedin/venice/views/VeniceView.java | 9 + .../com/linkedin/venice/views/ViewUtils.java | 110 +++++ .../venice/writer/ComplexVeniceWriter.java | 68 ++-- .../TestMaterializedViewEndToEnd.java | 377 +++++++++++++++--- .../linkedin/venice/view/TestViewWriter.java | 8 +- .../venice/controller/VeniceHelixAdmin.java | 11 +- .../controller/VeniceParentHelixAdmin.java | 67 +++- 28 files changed, 980 insertions(+), 293 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index be4d57e637c..efd590ed0c4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -104,7 +104,7 @@ public class VeniceChangelogConsumerImpl implements VeniceChangelogConsumer { private static final Logger LOGGER = LogManager.getLogger(VeniceChangelogConsumerImpl.class); private static final int MAX_SUBSCRIBE_RETRIES = 5; - private static final String ROCKSDB_BUFFER_FOLDER = "rocksdb-chunk-buffer"; + private static final String ROCKSDB_BUFFER_FOLDER_PREFIX = "rocksdb-chunk-buffer-"; protected long subscribeTime = Long.MAX_VALUE; protected final ReadWriteLock subscriptionLock = new ReentrantReadWriteLock(); @@ -191,8 +191,11 @@ public VeniceChangelogConsumerImpl( throw new VeniceException("bootstrapFileSystemPath must be configured for consuming view store: " + storeName); } // Create a new folder in user provided path so if the path contains other important files we don't drop it. - rocksDBBufferProperties - .put(DATA_BASE_PATH, RocksDBUtils.composeStoreDbDir(rocksDBBufferPath, ROCKSDB_BUFFER_FOLDER)); + rocksDBBufferProperties.put( + DATA_BASE_PATH, + RocksDBUtils.composeStoreDbDir( + rocksDBBufferPath, + ROCKSDB_BUFFER_FOLDER_PREFIX + changelogClientConfig.getConsumerName())); // These properties are required to build a VeniceServerConfig but is never used by RocksDBStorageEngineFactory. // Instead of setting these configs, we could refactor RocksDBStorageEngineFactory to take a more generic config. rocksDBBufferProperties.put(CLUSTER_NAME, ""); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 5c953d514dd..f9cc575baa8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -71,6 +71,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -456,7 +457,8 @@ private PubSubMessageProcessedResult processActiveActiveMessage( consumerRecord.getTopicPartition(), valueManifestContainer, beforeProcessingBatchRecordsTimestampMs)); - if (hasChangeCaptureView || (hasComplexVenicePartitionerMaterializedView && msgType == MessageType.DELETE)) { + if (hasChangeCaptureView || hasFilterByFieldsMaterializedView + || (hasComplexVenicePartitionerMaterializedView && msgType == MessageType.DELETE)) { /** * Since this function will update the transient cache before writing the view, and if there is * a change capture view writer, we need to lookup first, otherwise the transient cache will be populated @@ -655,8 +657,9 @@ protected void processMessageAndMaybeProduceToKafka( // following function // call in this context much less obtrusive, however, it implies that all views can only work for AA stores - // Write to views - Runnable produceToVersionTopic = () -> producePutOrDeleteToKafka( + // Write to views. In A/A ingestion we never need to delay VT writes. Using local variables is sufficient to + // define the produceToVersionTopic consumer. + Consumer produceToVersionTopic = (ignored) -> producePutOrDeleteToKafka( mergeConflictResultWrapper, partitionConsumptionState, keyBytes, @@ -676,25 +679,25 @@ protected void processMessageAndMaybeProduceToKafka( ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get(); int oldValueSchemaId = oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId(); - Lazy valueProvider = mergeConflictResultWrapper.getValueProvider(); // The helper function takes in a BiFunction but the parameter for view partition set will never be used and // always null for A/A ingestion of the RT topic. queueUpVersionTopicWritesWithViewWriters( partitionConsumptionState, - (viewWriter, ignored) -> viewWriter.processRecord( + (viewWriter) -> viewWriter.processRecord( mergeConflictResultWrapper.getUpdatedValueBytes(), oldValueBB, keyBytes, mergeConflictResult.getValueSchemaId(), oldValueSchemaId, mergeConflictResult.getRmdRecord(), - valueProvider), - null, - produceToVersionTopic); + mergeConflictResultWrapper.getValueProvider(), + mergeConflictResultWrapper.getDeserializedOldValueProvider()), + produceToVersionTopic, + Collections.singletonList(consumerRecordWrapper)); } else { // This function may modify the original record in KME and it is unsafe to use the payload from KME directly // after this call. - produceToVersionTopic.run(); + produceToVersionTopic.accept(consumerRecordWrapper); } } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index b79924cc7ab..7ba8aef9378 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -31,8 +31,8 @@ import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; +import com.linkedin.davinci.store.record.ByteBufferValueRecord; import com.linkedin.davinci.store.record.ValueRecord; -import com.linkedin.davinci.store.view.ChangeCaptureViewWriter; import com.linkedin.davinci.store.view.MaterializedViewWriter; import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.davinci.validation.DivSnapshot; @@ -80,6 +80,7 @@ import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.serialization.AvroStoreDeserializerCache; +import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.serializer.RecordDeserializer; @@ -91,7 +92,6 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; -import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.ChunkAwareCallback; import com.linkedin.venice.writer.LeaderCompleteState; import com.linkedin.venice.writer.LeaderMetadataWrapper; @@ -124,11 +124,13 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.LongPredicate; import java.util.function.Predicate; import java.util.function.Supplier; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.logging.log4j.LogManager; @@ -218,6 +220,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { protected final Map viewWriters; protected final boolean hasChangeCaptureView; protected final boolean hasComplexVenicePartitionerMaterializedView; + protected final boolean hasFilterByFieldsMaterializedView; protected final AvroStoreDeserializerCache storeDeserializerCache; @@ -227,6 +230,8 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { private final Version version; protected final ExecutorService aaWCIngestionStorageLookupThreadPool; + private final Lazy chunkingSuffixSerializer = + Lazy.of(KeyWithChunkingSuffixSerializer::new); public LeaderFollowerStoreIngestionTask( StorageService storageService, @@ -353,21 +358,31 @@ public LeaderFollowerStoreIngestionTask( schemaRepository.getKeySchema(store.getName()).getSchema()); boolean tmpValueForHasChangeCaptureViewWriter = false; boolean tmpValueForHasComplexVenicePartitioner = false; + boolean tmpValueForHasFilterByFields = false; for (Map.Entry viewWriter: viewWriters.entrySet()) { - if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) { + if (viewWriter.getValue().getViewWriterType() == VeniceViewWriter.ViewWriterType.CHANGE_CAPTURE_VIEW) { tmpValueForHasChangeCaptureViewWriter = true; } else if (viewWriter.getValue().getViewWriterType() == VeniceViewWriter.ViewWriterType.MATERIALIZED_VIEW) { - if (((MaterializedViewWriter) viewWriter.getValue()).isComplexVenicePartitioner()) { + MaterializedViewWriter materializedViewWriter = (MaterializedViewWriter) viewWriter.getValue(); + if (materializedViewWriter.isComplexVenicePartitioner()) { tmpValueForHasComplexVenicePartitioner = true; } + if (materializedViewWriter.isProjectionEnabled()) { + materializedViewWriter.configureWriterForProjection(compressor); + } + if (materializedViewWriter.isFilterByFieldsEnabled()) { + tmpValueForHasFilterByFields = true; + } } } hasChangeCaptureView = tmpValueForHasChangeCaptureViewWriter; hasComplexVenicePartitionerMaterializedView = tmpValueForHasComplexVenicePartitioner; + hasFilterByFieldsMaterializedView = tmpValueForHasFilterByFields; } else { viewWriters = Collections.emptyMap(); hasChangeCaptureView = false; hasComplexVenicePartitionerMaterializedView = false; + hasFilterByFieldsMaterializedView = false; } this.storeDeserializerCache = new AvroStoreDeserializerCache( builder.getSchemaRepo(), @@ -3179,9 +3194,19 @@ private PubSubMessageProcessedResult processMessage( KafkaMessageEnvelope kafkaValue = consumerRecord.getValue(); byte[] keyBytes = kafkaKey.getKey(); MessageType msgType = MessageType.valueOf(kafkaValue.messageType); - Lazy valueProvider; + Lazy valueProvider = Lazy.of(() -> null); + Lazy oldValueProvider = Lazy.of(() -> null); switch (msgType) { case PUT: + if (hasFilterByFieldsMaterializedView) { + GenericRecord oldValue = readStoredValueRecord( + partitionConsumptionState, + keyBytes, + schemaRepository.getSupersetSchema(storeName).getId(), + consumerRecord.getTopicPartition(), + new ChunkedValueManifestContainer()); + oldValueProvider = Lazy.of(() -> oldValue); + } Put put = (Put) kafkaValue.payloadUnion; // Value provider should use un-compressed data. final ByteBuffer rawPutValue = put.putValue; @@ -3221,7 +3246,8 @@ private PubSubMessageProcessedResult processMessage( null); } - return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(put, null, false, valueProvider)); + return new PubSubMessageProcessedResult( + new WriteComputeResultWrapper(put, null, false, valueProvider, oldValueProvider)); case UPDATE: /** @@ -3263,7 +3289,11 @@ private PubSubMessageProcessedResult processMessage( readerValueSchemaId, consumerRecord.getTopicPartition(), valueManifestContainer); - + if (hasFilterByFieldsMaterializedView) { + // Copy the currValue since it will be used for in-place update(s). + GenericRecord oldValue = new GenericData.Record((GenericData.Record) currValue, true); + oldValueProvider = Lazy.of(() -> oldValue); + } final byte[] updatedValueBytes; final ChunkedValueManifest oldValueManifest = valueManifestContainer.getManifest(); WriteComputeResult writeComputeResult; @@ -3321,25 +3351,22 @@ private PubSubMessageProcessedResult processMessage( updatedPut, oldValueManifest, false, - Lazy.of(writeComputeResult::getUpdatedValue))); + Lazy.of(writeComputeResult::getUpdatedValue), + oldValueProvider)); } case DELETE: - Lazy oldValueProvider; if (hasComplexVenicePartitionerMaterializedView) { // Best-effort to provide the old value for delete operation in case needed by a ComplexVeniceWriter to // generate deletes for materialized view topic partition(s). We need to do a non-lazy lookup before, so we // have a chance of getting the old value before the transient record cache is updated to null as part of // processing the DELETE. - int oldValueReaderSchemaId = schemaRepository.getSupersetSchema(storeName).getId(); GenericRecord oldValue = readStoredValueRecord( partitionConsumptionState, keyBytes, - oldValueReaderSchemaId, + schemaRepository.getSupersetSchema(storeName).getId(), consumerRecord.getTopicPartition(), new ChunkedValueManifestContainer()); oldValueProvider = Lazy.of(() -> oldValue); - } else { - oldValueProvider = Lazy.of(() -> null); } /** * For WC enabled stores update the transient record map with the latest {key,null} for similar reason as mentioned in PUT above. @@ -3348,7 +3375,8 @@ private PubSubMessageProcessedResult processMessage( partitionConsumptionState .setTransientRecord(kafkaClusterId, consumerRecord.getPosition().getNumericOffset(), keyBytes, -1, null); } - return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false, oldValueProvider)); + return new PubSubMessageProcessedResult( + new WriteComputeResultWrapper(null, null, false, valueProvider, oldValueProvider)); default: throw new VeniceMessageException( @@ -3387,31 +3415,84 @@ protected void processMessageAndMaybeProduceToKafka( if (msgType.equals(UPDATE) && writeComputeResultWrapper.isSkipProduce()) { return; } - Runnable produceToVersionTopic = () -> produceToLocalKafkaHelper( - consumerRecord, - partitionConsumptionState, - writeComputeResultWrapper, - partition, - kafkaUrl, - kafkaClusterId, - beforeProcessingRecordTimestampNs); - // Write to views + Put newPut = writeComputeResultWrapper.getNewPut(); + // Write to views. if (hasViewWriters()) { - Put newPut = writeComputeResultWrapper.getNewPut(); - Map> viewPartitionMap = null; + consumerRecordWrapper.setProcessedResult(new PubSubMessageProcessedResult(writeComputeResultWrapper)); + consumerRecordWrapper.setBeforeProcessingPerRecordTimestampNs(beforeProcessingRecordTimestampNs); + Consumer produceToVersionTopic = + (messageWrapper) -> produceToLocalKafkaHelper( + messageWrapper.getMessage(), + partitionConsumptionState, + messageWrapper.getProcessedResult().getWriteComputeResultWrapper(), + partition, + kafkaUrl, + kafkaClusterId, + messageWrapper.getBeforeProcessingPerRecordTimestampNs()); if (!partitionConsumptionState.isEndOfPushReceived()) { - // NR pass-through records are expected to carry view partition map in the message header - viewPartitionMap = ViewUtils.extractViewPartitionMap(consumerRecord.getPubSubMessageHeaders()); + // Native replication (NR) pass-through mode + partitionConsumptionState.addMessageToPendingMessages(consumerRecordWrapper); + ByteBufferValueRecord valueRecord = chunkAssembler.get() + .bufferAndAssembleRecord( + consumerRecord.getTopicPartition(), + newPut.schemaId, + keyBytes, + newPut.putValue, + consumerRecord.getPosition().getNumericOffset(), + compressor.get()); + if (valueRecord != null) { + // Only process full records + Lazy valueProvider = Lazy.of(() -> { + try { + return storeDeserializerCache.getDeserializer(valueRecord.writerSchemaId(), valueRecord.writerSchemaId()) + .deserialize(compressor.get().decompress(valueRecord.value())); + } catch (IOException e) { + throw new VeniceException( + "Unable to provide value during NR pass-through due to decompression failure", + e); + } + }); + final byte[] originalKeyBytes; + if (isChunked) { + // valueRecord is not null for full records and manifest and the key for both is serialized with non chunked + // key suffix in source version topic if chunking is enabled. + originalKeyBytes = chunkingSuffixSerializer.get().extractKeyFromNonChunkedKeySuffix(keyBytes); + } else { + originalKeyBytes = keyBytes; + } + queueUpVersionTopicWritesWithViewWriters( + partitionConsumptionState, + (viewWriter) -> viewWriter.processRecord( + valueRecord.value(), + originalKeyBytes, + valueRecord.writerSchemaId(), + valueProvider, + Lazy.of(() -> null)), + produceToVersionTopic, + partitionConsumptionState.getAndClearPendingMessagesToLocalVT()); + } + } else { + // NR standard mode where upstream is RT topic. + queueUpVersionTopicWritesWithViewWriters( + partitionConsumptionState, + (viewWriter) -> viewWriter.processRecord( + newPut.putValue, + keyBytes, + newPut.schemaId, + writeComputeResultWrapper.getValueProvider(), + writeComputeResultWrapper.getOldValueProvider()), + produceToVersionTopic, + Collections.singletonList(consumerRecordWrapper)); } - Lazy newValueProvider = writeComputeResultWrapper.getValueProvider(); - queueUpVersionTopicWritesWithViewWriters( - partitionConsumptionState, - (viewWriter, viewPartitionSet) -> viewWriter - .processRecord(newPut.putValue, keyBytes, newPut.schemaId, viewPartitionSet, newValueProvider), - viewPartitionMap, - produceToVersionTopic); } else { - produceToVersionTopic.run(); + produceToLocalKafkaHelper( + consumerRecord, + partitionConsumptionState, + writeComputeResultWrapper, + partition, + kafkaUrl, + kafkaClusterId, + beforeProcessingRecordTimestampNs); } } @@ -4100,9 +4181,9 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio protected void queueUpVersionTopicWritesWithViewWriters( PartitionConsumptionState partitionConsumptionState, - BiFunction, CompletableFuture> viewWriterRecordProcessor, - Map> viewPartitionMap, - Runnable versionTopicWrite) { + Function> viewWriterRecordProcessor, + Consumer versionTopicWrite, + List consumerRecordWrappers) { long preprocessingTime = System.currentTimeMillis(); CompletableFuture currentVersionTopicWrite = new CompletableFuture<>(); CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1]; @@ -4110,21 +4191,15 @@ protected void queueUpVersionTopicWritesWithViewWriters( // The first future is for the previous write to VT viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture(); for (VeniceViewWriter writer: viewWriters.values()) { - Set viewPartitionSet = null; - if (viewPartitionMap != null && writer.getViewWriterType() == VeniceViewWriter.ViewWriterType.MATERIALIZED_VIEW) { - MaterializedViewWriter mvWriter = (MaterializedViewWriter) writer; - viewPartitionSet = viewPartitionMap.get(mvWriter.getViewName()); - if (viewPartitionSet == null) { - throw new VeniceException("Unable to find view partition set for view: " + mvWriter.getViewName()); - } - } - viewWriterFutures[index++] = viewWriterRecordProcessor.apply(writer, viewPartitionSet); + viewWriterFutures[index++] = viewWriterRecordProcessor.apply(writer); } hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); if (exception == null) { - versionTopicWrite.run(); + for (PubSubMessageProcessedResultWrapper consumerRecordWrapper: consumerRecordWrappers) { + versionTopicWrite.accept(consumerRecordWrapper); + } currentVersionTopicWrite.complete(null); } else { VeniceException veniceException = new VeniceException(exception); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java index c5ded56590e..bf3fb2779c5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java @@ -26,10 +26,14 @@ public class MergeConflictResultWrapper { private final ByteBuffer updatedRmdBytes; /** - * Best-effort deserialized value provider that provides the updated value for PUT/UPDATE and the old value for - * DELETE. + * Best-effort deserialized value provider that provides the updated value for PUT/UPDATE */ private final Lazy valueProvider; + /** + * Best-effort deserialized value provider that provides the updated value for PUT/UPDATE/DELETE. Returns null if the + * key/value didn't exist + */ + private final Lazy deserializedOldValueProvider; public MergeConflictResultWrapper( MergeConflictResult mergeConflictResult, @@ -47,15 +51,21 @@ public MergeConflictResultWrapper( this.oldValueManifestContainer = oldValueManifestContainer; this.updatedValueBytes = updatedValueBytes; this.updatedRmdBytes = updatedRmdBytes; - if (updatedValueBytes == null) { - // this is a DELETE + + // We will always configure the deserializedOldValueProvider. Theoretically we could cache the deserialized old + // value in the UPDATE branch, but it will require a deep copy since the record is used for in-place update(s). To + // reduce complexity we are just going to deserialize the old bytes. + this.deserializedOldValueProvider = Lazy.of(() -> { ByteBufferValueRecord oldValue = oldValueProvider.get(); if (oldValue == null || oldValue.value() == null) { - this.valueProvider = Lazy.of(() -> null); + return null; } else { - this.valueProvider = - Lazy.of(() -> deserializerProvider.apply(oldValue.writerSchemaId()).deserialize(oldValue.value())); + return deserializerProvider.apply(oldValue.writerSchemaId()).deserialize(oldValue.value()); } + }); + if (updatedValueBytes == null) { + // this is a DELETE + this.valueProvider = Lazy.of(() -> null); } else { // this is a PUT or UPDATE if (mergeConflictResult.getDeserializedValue().isPresent()) { @@ -100,10 +110,13 @@ public ByteBuffer getUpdatedRmdBytes() { /** * Return a best-effort value provider with the following behaviors: * 1. returns the new value provider for PUT and UPDATE. - * 2. returns the old value for DELETE (null for non-existent key). * 3. returns null if the value is not available. */ public Lazy getValueProvider() { return valueProvider; } + + public Lazy getDeserializedOldValueProvider() { + return deserializedOldValueProvider; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index 5e381d9dd0e..cca3d4a12b9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -20,6 +20,7 @@ import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -225,6 +226,7 @@ enum LatchStatus { // veniceWriterLazyRef could be set and get in different threads, mark it volatile. private volatile Lazy> veniceWriterLazyRef; + private final LinkedList pendingMessagesToLocalVT; public PartitionConsumptionState(String replicaId, int partition, OffsetRecord offsetRecord, boolean hybrid) { this.replicaId = replicaId; @@ -270,6 +272,7 @@ public PartitionConsumptionState(String replicaId, int partition, OffsetRecord o this.leaderCompleteState = LeaderCompleteState.LEADER_NOT_COMPLETED; this.lastLeaderCompleteStateUpdateInMs = 0; this.pendingReportIncPushVersionList = offsetRecord.getPendingReportIncPushVersionList(); + this.pendingMessagesToLocalVT = new LinkedList<>(); } public int getPartition() { @@ -896,4 +899,14 @@ public void clearPendingReportIncPushVersionList() { pendingReportIncPushVersionList.clear(); offsetRecord.setPendingReportIncPushVersionList(pendingReportIncPushVersionList); } + + public void addMessageToPendingMessages(PubSubMessageProcessedResultWrapper message) { + pendingMessagesToLocalVT.add(message); + } + + public List getAndClearPendingMessagesToLocalVT() { + LinkedList pendingMessages = (LinkedList) pendingMessagesToLocalVT.clone(); + pendingMessagesToLocalVT.clear(); + return pendingMessages; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java index c6fed613ae4..28ce709f0c9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java @@ -6,6 +6,7 @@ public class PubSubMessageProcessedResultWrapper { private final DefaultPubSubMessage message; private PubSubMessageProcessedResult processedResult; + private long beforeProcessingPerRecordTimestampNs; public PubSubMessageProcessedResultWrapper(DefaultPubSubMessage message) { this.message = message; @@ -22,4 +23,12 @@ public PubSubMessageProcessedResult getProcessedResult() { public void setProcessedResult(PubSubMessageProcessedResult processedResult) { this.processedResult = processedResult; } + + public long getBeforeProcessingPerRecordTimestampNs() { + return beforeProcessingPerRecordTimestampNs; + } + + public void setBeforeProcessingPerRecordTimestampNs(long beforeProcessingPerRecordTimestampNs) { + this.beforeProcessingPerRecordTimestampNs = beforeProcessingPerRecordTimestampNs; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index e38faa09876..abd75e8007a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -333,7 +333,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final IngestionNotificationDispatcher ingestionNotificationDispatcher; - protected final InMemoryChunkAssembler chunkAssembler; + protected final Lazy chunkAssembler; private final Optional cacheBackend; private final Schema recordTransformerInputValueSchema; private final AvroGenericDeserializer recordTransformerKeyDeserializer; @@ -492,9 +492,9 @@ public StoreIngestionTask( new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion); this.missingSOPCheckExecutor.execute(() -> waitForStateVersion(kafkaVersionTopic)); this.cacheBackend = cacheBackend; + this.chunkAssembler = Lazy.of(() -> new InMemoryChunkAssembler(new InMemoryStorageEngine(storeName))); if (recordTransformerConfig != null && recordTransformerConfig.getRecordTransformerFunction() != null) { - this.chunkAssembler = new InMemoryChunkAssembler(new InMemoryStorageEngine(storeName)); Schema keySchema = schemaRepository.getKeySchema(storeName).getSchema(); this.recordTransformerKeyDeserializer = new AvroGenericDeserializer(keySchema, keySchema); this.recordTransformerInputValueSchema = schemaRepository.getSupersetOrLatestValueSchema(storeName).getSchema(); @@ -536,7 +536,6 @@ public StoreIngestionTask( this.recordTransformerKeyDeserializer = null; this.recordTransformerInputValueSchema = null; this.recordTransformerDeserializersByPutSchemaId = null; - this.chunkAssembler = null; } this.localKafkaServer = this.kafkaProps.getProperty(KAFKA_BOOTSTRAP_SERVERS); @@ -3835,13 +3834,14 @@ private int processKafkaDataMessage( if (recordTransformer != null && messageType == MessageType.PUT) { long recordTransformerStartTime = System.nanoTime(); - ByteBufferValueRecord assembledRecord = chunkAssembler.bufferAndAssembleRecord( - consumerRecord.getTopicPartition(), - put.getSchemaId(), - keyBytes, - put.getPutValue(), - consumerRecord.getPosition().getNumericOffset(), - compressor.get()); + ByteBufferValueRecord assembledRecord = chunkAssembler.get() + .bufferAndAssembleRecord( + consumerRecord.getTopicPartition(), + put.getSchemaId(), + keyBytes, + put.getPutValue(), + consumerRecord.getPosition().getNumericOffset(), + compressor.get()); // Current record is a chunk. We only write to the storage engine for fully assembled records if (assembledRecord == null) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java index a4e4a8f459c..b34f338a0d2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java @@ -17,20 +17,23 @@ public class WriteComputeResultWrapper { */ private final boolean skipProduce; private final Lazy valueProvider; + private final Lazy oldValueProvider; public WriteComputeResultWrapper(Put newPut, ChunkedValueManifest oldValueManifest, boolean skipProduce) { - this(newPut, oldValueManifest, skipProduce, Lazy.of(() -> null)); + this(newPut, oldValueManifest, skipProduce, Lazy.of(() -> null), Lazy.of(() -> null)); } public WriteComputeResultWrapper( Put newPut, ChunkedValueManifest oldValueManifest, boolean skipProduce, - Lazy valueProvider) { + Lazy valueProvider, + Lazy oldValueProvider) { this.newPut = newPut; this.oldValueManifest = oldValueManifest; this.skipProduce = skipProduce; this.valueProvider = valueProvider; + this.oldValueProvider = oldValueProvider; } public Put getNewPut() { @@ -48,10 +51,17 @@ public boolean isSkipProduce() { /** * Return a best-effort value provider with the following behaviors: * 1. returns the new value provider for PUT and UPDATE. - * 2. returns the old value for DELETE (null for non-existent key). - * 3. returns null if the value is not available. + * 2. returns null if the value is not available. */ public Lazy getValueProvider() { - return this.valueProvider; + return valueProvider; + } + + /** + * Return a best-effort old value provider that returns the old value prior to the PUT/UPDATE/DELETE. Returns null if + * the k/v didn't exist or not available. + */ + public Lazy getOldValueProvider() { + return oldValueProvider; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java index 25206becf7b..5ca526b336c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java @@ -2,11 +2,14 @@ import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.meta.ClusterInfoProvider; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.ReadOnlyViewStore; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreDataChangedListener; import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.schema.GeneratedSchemaID; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.rmd.RmdSchemaEntry; @@ -66,6 +69,31 @@ public SchemaEntry getKeySchema(String storeName) { @Override public SchemaEntry getValueSchema(String storeName, int id) { + if (VeniceView.isViewStore(storeName)) { + // We need to check if the view has projection enabled. + Store store = getStoreOrThrow(VeniceView.getStoreNameFromViewStoreName(storeName)); + String viewName = VeniceView.getViewNameFromViewStoreName(storeName); + // Search through the versions because it's possible that the view config is removed in the store config but a + // consumer is still ingesting the current/backup version that had the config. Ideally, we should only delete a + // view once we are sure all consumers are stopped (DVC/CC clients) but we can't really enforce that. + // In addition, this only works because view configs are immutable, otherwise we will need to know exactly which + // store version is the value schema for in order to provide the correct projection schema. + for (Version version: store.getVersions()) { + if (version.getViewConfigs().containsKey(viewName)) { + ViewConfig viewConfig = version.getViewConfigs().get(viewName); + if (viewConfig.getViewParameters() + .containsKey(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_SCHEMA.name())) { + String projectionSchemaString = viewConfig.getViewParameters() + .get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_SCHEMA.name()); + return new SchemaEntry(id, projectionSchemaString); + } else { + // This view does not have projection enabled. The regular store value schema will be returned accordingly. + return nativeMetadataRepository.getValueSchema(VeniceView.getStoreName(storeName), id); + } + } + } + throw new VeniceNoStoreException(storeName); + } return nativeMetadataRepository.getValueSchema(VeniceView.getStoreName(storeName), id); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java index c29be3e2f3f..e884e308a9b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -67,7 +66,8 @@ public CompletableFuture processRecord( int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord, - Lazy valueProvider) { + Lazy valueProvider, + Lazy oldValueProvider) { // TODO: not sold about having currentValue in the interface but it VASTLY simplifies a lot of things with regards // to dealing with compression/chunking/etc. in the storage layer. @@ -89,8 +89,8 @@ public CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - Set viewPartitionSet, - Lazy newValueProvider) { + Lazy newValueProvider, + Lazy oldValueProvider) { // No op return CompletableFuture.completedFuture(null); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java index 861327c6c8b..f887944bbb9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java @@ -2,6 +2,7 @@ import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; +import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; @@ -12,13 +13,13 @@ import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.ComplexVeniceWriter; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; import java.nio.ByteBuffer; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -33,7 +34,7 @@ public class MaterializedViewWriter extends VeniceViewWriter { private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory; private final MaterializedView internalView; private final String materializedViewTopicName; - private Lazy veniceWriter; + private Lazy> veniceWriter; public MaterializedViewWriter( VeniceConfigLoader props, @@ -46,11 +47,24 @@ public MaterializedViewWriter( new MaterializedView(props.getCombinedProperties().toProperties(), version.getStoreName(), extraViewParameters); materializedViewTopicName = internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get(); - this.veniceWriter = Lazy.of( + veniceWriter = Lazy.of( () -> new VeniceWriterFactory(props.getCombinedProperties().toProperties(), pubSubProducerAdapterFactory, null) .createComplexVeniceWriter(buildWriterOptions())); } + public void configureWriterForProjection(Lazy compressor) { + if (!isProjectionEnabled()) { + throw new VeniceException( + "Cannot configure writer for projection because projection is not enabled for view:" + + internalView.getViewName()); + } + ViewUtils.configureWriterForProjection( + veniceWriter.get(), + getViewName(), + compressor, + internalView.getProjectionSchema()); + } + /** * package private for testing purpose */ @@ -66,8 +80,9 @@ public CompletableFuture processRecord( int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord, - Lazy valueProvider) { - return processRecord(newValue, key, newValueSchemaId, null, valueProvider); + Lazy valueProvider, + Lazy oldValueProvider) { + return processRecord(newValue, key, newValueSchemaId, valueProvider, oldValueProvider); } /** @@ -79,24 +94,24 @@ public CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - Set viewPartitionSet, - Lazy newValueProvider) { - byte[] newValueBytes = newValue == null ? null : ByteUtils.extractByteArray(newValue); - if (viewPartitionSet != null) { - if (newValue == null) { - // This is unexpected because we only attach view partition map for PUT records. - throw new VeniceException( - "Encountered a null PUT record while having view partition map in the message header"); - } - // Forward the record to corresponding view partition without any processing (NR pass-through mode). - return veniceWriter.get().forwardPut(key, newValueBytes, newValueSchemaId, viewPartitionSet); - } + Lazy newValueProvider, + Lazy oldValueProvider) { if (newValue == null) { - // This is a delete operation. newValueProvider will contain the old value in a best effort manner. The old value - // might not be available if we are deleting a non-existing key. - return veniceWriter.get().complexDelete(key, newValueProvider); + // This is a delete operation. The old value might not be available if we are deleting a non-existing key. + return veniceWriter.get().complexDelete(key, oldValueProvider); } - return veniceWriter.get().complexPut(key, newValueBytes, newValueSchemaId, newValueProvider); + if (isFilterByFieldsEnabled()) { + // We only support one type of filter operation which is to skip records if the filter by fields didn't change + if (!ViewUtils.changeFilter( + oldValueProvider.get(), + newValueProvider.get(), + internalView.getFilterByFields(), + getViewName())) { + // Did not pass the change filter requirement, skip this record + return CompletableFuture.completedFuture(null); + } + } + return veniceWriter.get().complexPut(key, ByteUtils.extractByteArray(newValue), newValueSchemaId, newValueProvider); } @Override @@ -132,4 +147,12 @@ public boolean isComplexVenicePartitioner() { public String getViewName() { return internalView.getViewName(); } + + public boolean isProjectionEnabled() { + return internalView.getProjectionSchema() != null; + } + + public boolean isFilterByFieldsEnabled() { + return !internalView.getFilterByFields().isEmpty(); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java index 9397c4e0432..a1b5e67bf98 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java @@ -15,7 +15,6 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -69,9 +68,9 @@ public VeniceViewWriter( * @param key the key of the record that designates newValue and oldValue * @param newValueSchemaId the schemaId of the incoming record * @param oldValueSchemaId the schemaId of the old record - * @param replicationMetadataRecord the associated RMD for the incoming record. - * @param valueProvider to provide the corresponding deserialized newValue for PUT and UPDATE or the old value for the - * given key for DELETE. + * @param replicationMetadataRecord the associated RMD for the incoming record + * @param valueProvider to provide the deserialized new value + * @param oldValueProvider to provide the deserialized old value */ public abstract CompletableFuture processRecord( ByteBuffer newValue, @@ -80,7 +79,8 @@ public abstract CompletableFuture processRecord( int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord, - Lazy valueProvider); + Lazy valueProvider, + Lazy oldValueProvider); /** * To be called as a given ingestion task consumes each record. This is called prior to writing to a @@ -89,18 +89,15 @@ public abstract CompletableFuture processRecord( * @param newValue the incoming fully specified value which hasn't yet been committed to Venice * @param key the key of the record that designates newValue and oldValue * @param newValueSchemaId the schemaId of the incoming record - * @param viewPartitionSet set of view partitions this record should be processed to. This is used in NR - * pass-through when remote region leaders can forward record or chunks of a record - * to the correct view partitions without the need to perform chunk assembly or - * repartitioning. - * @param newValueProvider to provide the deserialized new value + * @param valueProvider to provide the deserialized new value + * @param oldValueProvider to provide the deserialized old value */ public abstract CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - Set viewPartitionSet, - Lazy newValueProvider); + Lazy valueProvider, + Lazy oldValueProvider); public abstract ViewWriterType getViewWriterType(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java index f1ec0b1fdc0..4d2a234ead3 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java @@ -69,6 +69,7 @@ import it.unimi.dsi.fastutil.objects.Object2IntMaps; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -314,10 +315,10 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc .thenReturn(CompletableFuture.completedFuture(null)); leaderFollowerStoreIngestionTask.queueUpVersionTopicWritesWithViewWriters( mockPartitionConsumptionState, - (viewWriter, viewPartitionSet) -> viewWriter - .processRecord(mock(ByteBuffer.class), new byte[1], 1, viewPartitionSet, Lazy.of(() -> null)), - null, - () -> writeToVersionTopic.set(true)); + (viewWriter) -> viewWriter + .processRecord(mock(ByteBuffer.class), new byte[1], 1, Lazy.of(() -> null), Lazy.of(() -> null)), + (consumerRecordWrapper) -> writeToVersionTopic.set(true), + Collections.singletonList(mock(PubSubMessageProcessedResultWrapper.class))); verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture(); ArgumentCaptor vtWriteFutureCaptor = ArgumentCaptor.forClass(CompletableFuture.class); verify(mockPartitionConsumptionState, times(1)).setLastVTProduceCallFuture(vtWriteFutureCaptor.capture()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java index 5d2831d389a..3206e8f8dfd 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java @@ -249,17 +249,41 @@ public void testProcessRecord() throws ExecutionException, InterruptedException Lazy dummyValueProvider = Lazy.of(() -> null); // Update Case changeCaptureViewWriter - .processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider) + .processRecord( + NEW_VALUE, + OLD_VALUE, + KEY, + 1, + 1, + rmdRecordWithValueLevelTimeStamp, + dummyValueProvider, + dummyValueProvider) .get(); // Insert Case changeCaptureViewWriter - .processRecord(NEW_VALUE, null, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider) + .processRecord( + NEW_VALUE, + null, + KEY, + 1, + 1, + rmdRecordWithValueLevelTimeStamp, + dummyValueProvider, + dummyValueProvider) .get(); // Deletion Case changeCaptureViewWriter - .processRecord(null, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider) + .processRecord( + null, + OLD_VALUE, + KEY, + 1, + 1, + rmdRecordWithValueLevelTimeStamp, + dummyValueProvider, + dummyValueProvider) .get(); // Set up argument captors diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java index 46cace62ec1..7cc20200642 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java @@ -5,11 +5,9 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -19,7 +17,6 @@ import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.davinci.utils.UnitTestComplexPartitioner; -import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; @@ -34,22 +31,17 @@ import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.VeniceProperties; -import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.writer.ComplexVeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.testng.Assert; import org.testng.annotations.Test; @@ -141,39 +133,6 @@ public void testProcessControlMessage() { verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); } - @Test - public void testViewWriterCanForwardCorrectly() { - String storeName = "testStoreWithChunkedKeys"; - String viewName = "testMaterializedViewWithChunkedKeys"; - Version version = mock(Version.class); - doReturn(true).when(version).isChunkingEnabled(); - doReturn(true).when(version).isRmdChunkingEnabled(); - getMockStore(storeName, 1, version); - MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName); - viewParamsBuilder.setPartitionCount(6); - viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); - Map viewParamsMap = viewParamsBuilder.build(); - VeniceConfigLoader props = getMockProps(); - MaterializedViewWriter materializedViewWriter = new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap); - ComplexVeniceWriter veniceWriter = mock(ComplexVeniceWriter.class); - doReturn(CompletableFuture.completedFuture(null)).when(veniceWriter).forwardPut(any(), any(), anyInt(), any()); - materializedViewWriter.setVeniceWriter(veniceWriter); - byte[] keyBytes = new byte[5]; - byte[] valueBytes = new byte[10]; - ByteBuffer value = ByteBuffer.wrap(valueBytes); - Set viewPartitionSet = new HashSet<>(); - viewPartitionSet.add(1); - viewPartitionSet.add(4); - Lazy valueProvider = mock(Lazy.class); - Assert.assertThrows( - VeniceException.class, - () -> materializedViewWriter.processRecord(null, keyBytes, 1, viewPartitionSet, valueProvider)); - materializedViewWriter.processRecord(value, keyBytes, 1, viewPartitionSet, valueProvider); - verify(veniceWriter, times(1)).forwardPut(eq(keyBytes), eq(valueBytes), eq(1), eq(viewPartitionSet)); - verify(veniceWriter, never()).complexPut(any(), any(), anyInt(), any()); - verify(veniceWriter, never()).complexDelete(any(), any()); - } - @Test public void testIsComplexVenicePartitioner() { String storeName = "testStore"; diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java index 906d871a7eb..6247863bc55 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java @@ -165,7 +165,7 @@ public int getValueSchemaId() { private Lazy veniceWriterFactory; private AbstractVeniceWriter veniceWriter = null; private VeniceWriter mainWriter = null; - private ComplexVeniceWriter[] childWriters = null; + private ComplexVeniceWriter[] childWriters = null; private int valueSchemaId = -1; private int derivedValueSchemaId = -1; private boolean enableWriteCompute = false; @@ -431,7 +431,7 @@ private AbstractVeniceWriter createCompositeVeniceWriter version.setRmdChunkingEnabled(rmdChunkingEnabled); // Default deser and decompress function for simple partitioner where value provider is never going to be used. BiFunction valueExtractor = (valueBytes, valueSchemaId) -> null; - boolean complexPartitionerConfigured = false; + boolean valueExtractorConfigured = false; int index = 0; for (ViewConfig viewConfig: viewConfigMap.values()) { VeniceView view = ViewUtils @@ -439,9 +439,7 @@ private AbstractVeniceWriter createCompositeVeniceWriter String viewTopic = view.getTopicNamesAndConfigsForVersion(versionNumber).keySet().stream().findAny().get(); if (view instanceof MaterializedView) { MaterializedView materializedView = (MaterializedView) view; - if (materializedView.getViewPartitioner() - .getPartitionerType() == VenicePartitioner.VenicePartitionerType.COMPLEX - && !complexPartitionerConfigured) { + if (materializedView.isValueProviderNeeded() && !valueExtractorConfigured) { // Initialize value schemas, deser cache and other variables needed by ComplexVenicePartitioner initializeSchemaSourceAndDeserCache(); compressor.get(); @@ -461,10 +459,19 @@ private AbstractVeniceWriter createCompositeVeniceWriter .deserialize(decompressedBytes); }; // We only need to configure these variables once per CompositeVeniceWriter - complexPartitionerConfigured = true; + valueExtractorConfigured = true; } - childWriters[index++] = + ComplexVeniceWriter complexVeniceWriter = factory.createComplexVeniceWriter(view.getWriterOptionsBuilder(viewTopic, version).build()); + if (materializedView.getProjectionSchema() != null) { + // Configure writer for projection support + ViewUtils.configureWriterForProjection( + complexVeniceWriter, + materializedView.getViewName(), + compressor, + materializedView.getProjectionSchema()); + } + childWriters[index++] = complexVeniceWriter; } else { throw new UnsupportedOperationException("Only materialized view is supported in VPJ"); } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java index 5ceb0714b8e..f82c5c54498 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java @@ -140,8 +140,8 @@ private CompletableFuture compositePut( Map> viewPartitionMap = new HashMap<>(); int index = 0; for (ComplexVeniceWriter writer: childWriters) { - // There should be an entry for every materialized view, even if the partition set is empty. This way we can - // differentiate between skipped view write and missing view partition info unexpectedly. + // There should be an entry for every materialized view in the view partition map, even if the partition set is + // empty. This way we can differentiate between skipped view write and missing view partition info unexpectedly. childFutures[index++] = writer.complexPut( key, value, diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java index ce9d27204f5..535d0254bb8 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java @@ -27,17 +27,27 @@ public enum MaterializedViewParameters { * Parameter key used to specify the partition count for the re-partition view. */ MATERIALIZED_VIEW_PARTITION_COUNT, - + /** + * Parameter key used to specify the top level fields to perform filtering on for records to be produced to the view. + */ + MATERIALIZED_VIEW_FILTER_BY_FIELDS, /** * Parameter key used to specify the top level fields to perform projection on for records in the materialized view. */ - MATERIALIZED_VIEW_PROJECTION_FIELDS; + MATERIALIZED_VIEW_PROJECTION_FIELDS, + + /** + * Parameter key used to persist the generated projection schema. This is meant for internal use and purposely not + * exposed to the {@link Builder} + */ + MATERIALIZED_VIEW_PROJECTION_SCHEMA; public static class Builder { - private String viewName; + private final String viewName; private String partitioner; private String partitionerParams; private String partitionCount; + private List filterByFields = Collections.emptyList(); private List projectionFields = Collections.emptyList(); public Builder(String viewName) { @@ -49,13 +59,15 @@ public Builder(String viewName, Map viewParams) { this.partitioner = viewParams.get(MATERIALIZED_VIEW_PARTITIONER.name()); this.partitionerParams = viewParams.get(MATERIALIZED_VIEW_PARTITIONER_PARAMS.name()); this.partitionCount = viewParams.get(MATERIALIZED_VIEW_PARTITION_COUNT.name()); + String filteringFieldsString = viewParams.get(MATERIALIZED_VIEW_FILTER_BY_FIELDS.name()); + if (filteringFieldsString != null) { + this.filterByFields = + parsePropertyStringToList(filteringFieldsString, MATERIALIZED_VIEW_FILTER_BY_FIELDS.name()); + } String projectionFieldsString = viewParams.get(MATERIALIZED_VIEW_PROJECTION_FIELDS.name()); if (projectionFieldsString != null) { - try { - this.projectionFields = ObjectMapperFactory.getInstance().readValue(projectionFieldsString, List.class); - } catch (JsonProcessingException e) { - throw new VeniceException("Failed to parse the provided projection fields: " + projectionFieldsString, e); - } + this.projectionFields = + parsePropertyStringToList(projectionFieldsString, MATERIALIZED_VIEW_PROJECTION_FIELDS.name()); } } @@ -83,6 +95,11 @@ public Builder setPartitionCount(int partitionCount) { return this; } + public Builder setFilterByFields(List filterByFields) { + this.filterByFields = filterByFields; + return this; + } + public Builder setProjectionFields(List projectionFields) { this.projectionFields = projectionFields; return this; @@ -100,16 +117,35 @@ public Map build() { if (partitionCount != null) { viewParams.put(MATERIALIZED_VIEW_PARTITION_COUNT.name(), partitionCount); } + if (!filterByFields.isEmpty()) { + viewParams.put( + MATERIALIZED_VIEW_FILTER_BY_FIELDS.name(), + convertListToStringProperty(filterByFields, MATERIALIZED_VIEW_FILTER_BY_FIELDS.name())); + } if (!projectionFields.isEmpty()) { - try { - viewParams.put( - MATERIALIZED_VIEW_PROJECTION_FIELDS.name(), - ObjectMapperFactory.getInstance().writeValueAsString(projectionFields)); - } catch (JsonProcessingException e) { - throw new VeniceException("Failed to convert the projection fields to a string property", e); - } + viewParams.put( + MATERIALIZED_VIEW_PROJECTION_FIELDS.name(), + convertListToStringProperty(projectionFields, MATERIALIZED_VIEW_PROJECTION_FIELDS.name())); } return viewParams; } } + + public static String convertListToStringProperty(List list, String propertyName) { + try { + return ObjectMapperFactory.getInstance().writeValueAsString(list); + } catch (JsonProcessingException e) { + throw new VeniceException("Failed to convert list to a string property for property: " + propertyName); + } + } + + public static List parsePropertyStringToList(String propertyString, String propertyName) { + try { + return ObjectMapperFactory.getInstance().readValue(propertyString, List.class); + } catch (JsonProcessingException e) { + throw new VeniceException( + "Failed to parse the property string to list for property: " + propertyName + ", property string: " + + propertyString); + } + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java index 7881473a3e0..056951b82aa 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java @@ -393,6 +393,11 @@ static String parseStoreFromKafkaTopicName(String kafkaTopic) { return parseStoreFromStreamReprocessingTopic(kafkaTopic); } else if (isVersionTopic(kafkaTopic)) { return parseStoreFromVersionTopic(kafkaTopic); + } else if (VeniceView.isMaterializedViewTopic(kafkaTopic)) { + // We have store view adapters for metadata repositories in view consumers (CC and DVC) that expects the store + // view name to be in certain format in order to differentiate and retrieve the correct metadata. These adapters + // are generic and can be leveraged by other view types too but as of now only materialized view is using it. + return VeniceView.parseStoreAndViewFromViewTopic(kafkaTopic); } else if (VeniceView.isViewTopic(kafkaTopic)) { return VeniceView.parseStoreFromViewTopic(kafkaTopic); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java index eabce510d9d..20c866ccb3e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java @@ -53,6 +53,13 @@ public ByteBuffer serializeNonChunkedKey(ByteBuffer key) { return serialize(key, serializedNonChunkKeySuffix); } + public byte[] extractKeyFromNonChunkedKeySuffix(byte[] keyBytesWithNonChunkedKeySuffix) { + int keyLength = keyBytesWithNonChunkedKeySuffix.length - serializedNonChunkKeySuffix.length; + byte[] keyBytes = new byte[keyLength]; + System.arraycopy(keyBytesWithNonChunkedKeySuffix, 0, keyBytes, 0, keyLength); + return keyBytes; + } + private ByteBuffer serialize(ByteBuffer key, byte[] encodedChunkedKeySuffix) { /** * Here will always allocate a new {@link ByteBuffer} to accommodate} the combination of key and chunked diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java index f82011349a6..9596aa1ae78 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java @@ -15,6 +15,7 @@ import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.writer.VeniceWriterOptions; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -29,6 +30,7 @@ public class MaterializedView extends VeniceView { private final int viewPartitionCount; private final PartitionerConfig partitionerConfig; private Lazy viewPartitioner; + private final List filterByFields; public MaterializedView(Properties props, String storeName, Map viewParameters) { super(props, storeName, viewParameters); @@ -46,6 +48,15 @@ public MaterializedView(Properties props, String storeName, Map Map viewPartitionerParamsMap = PartitionUtils.getPartitionerParamsMap(viewPartitionerParamsString); this.partitionerConfig = new PartitionerConfigImpl(viewPartitionerClass, viewPartitionerParamsMap, DEFAULT_AMP_FACTOR); + String filterByFieldsString = + viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_FILTER_BY_FIELDS.name()); + if (filterByFieldsString != null) { + filterByFields = MaterializedViewParameters.parsePropertyStringToList( + filterByFieldsString, + MaterializedViewParameters.MATERIALIZED_VIEW_FILTER_BY_FIELDS.name()); + } else { + filterByFields = Collections.emptyList(); + } } @Override @@ -148,4 +159,17 @@ public String getViewName() { public PartitionerConfig getPartitionerConfig() { return partitionerConfig; } + + public String getProjectionSchema() { + return viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_SCHEMA.name()); + } + + public boolean isValueProviderNeeded() { + return getViewPartitioner().getPartitionerType() == VenicePartitioner.VenicePartitionerType.COMPLEX + || getProjectionSchema() != null; + } + + public List getFilterByFields() { + return filterByFields; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java index b0debb798a9..02f78ea5bcd 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java @@ -32,6 +32,7 @@ public abstract class VeniceView { public static final String VIEW_STORE_PREFIX = "view_store_"; public static final String VIEW_NAME_SEPARATOR = "_"; + public static final String PROJECTION_SCHEMA_SUFFIX = "projection"; protected final Properties props; protected final String storeName; protected final Map viewParameters; @@ -105,6 +106,10 @@ public static String parseStoreFromViewTopic(String topicName) { return topicName.substring(0, Version.getLastIndexOfVersionSeparator(topicName)); } + public static boolean isMaterializedViewTopic(String topicName) { + return topicName.endsWith(MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX); + } + // TODO: see above TODO for isViewTopic function, same applies here public static int parseVersionFromViewTopic(String topicName) { if (!isViewTopic(topicName)) { @@ -170,4 +175,8 @@ public static String getStoreName(String storeName) { return storeName; } } + + public static String getProjectionSchemaName(String storeName, String viewName) { + return storeName + VIEW_NAME_SEPARATOR + viewName + PROJECTION_SCHEMA_SUFFIX; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java index 36f7c0a6b27..2bb3dc71999 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java @@ -5,19 +5,30 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.pubsub.api.PubSubMessageHeader; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; +import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; +import com.linkedin.venice.serializer.RecordSerializer; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.ReflectUtils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; +import com.linkedin.venice.writer.ComplexVeniceWriter; import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; public class ViewUtils { @@ -111,4 +122,103 @@ public static PubSubMessageHeader getViewDestinationPartitionHeader( throw new VeniceException("Failed to serialize view destination partition map", e); } } + + /** + * This uses {@link com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper} for field extraction. Therefor it + * has the same limitations as regular read compute where for Avro-1.9 or above we cannot guarantee the extracted + * field will be exactly the same as existing field in terms of default value. Since the extracted default value will + * be in Java format. This shouldn't be an issue for most use cases, but it's something worth to keep in mind about. + */ + public static String validateAndGenerateProjectionSchema( + Schema latestValueSchema, + Set projectionFields, + String generatedSchemaName) { + if (latestValueSchema == null) { + throw new IllegalArgumentException("Latest value schema cannot be null"); + } + List projectionSchemaFields = new LinkedList<>(); + + for (String fieldName: projectionFields) { + Schema.Field field = latestValueSchema.getField(fieldName); + if (field == null) { + throw new VeniceException("Field: " + fieldName + " does not exist in latest value schema"); + } + if (!field.hasDefaultValue()) { + throw new VeniceException("Default value is required for field: " + fieldName); + } + projectionSchemaFields.add(AvroCompatibilityHelper.newField(field).setDoc("").build()); + } + + Schema generatedProjectionSchema = + Schema.createRecord(generatedSchemaName, "", latestValueSchema.getNamespace(), false); + generatedProjectionSchema.setFields(projectionSchemaFields); + return generatedProjectionSchema.toString(); + } + + public static void validateFilterByFields(Schema latestValueSchema, Set filterByFields) { + if (latestValueSchema == null) { + throw new IllegalArgumentException("Latest value schema cannot be null"); + } + for (String fieldName: filterByFields) { + Schema.Field field = latestValueSchema.getField(fieldName); + if (field == null) { + throw new VeniceException("Field: " + fieldName + " does not exist in latest value schema"); + } + } + } + + public static void project(GenericRecord inputRecord, GenericRecord resultRecord) { + Schema.Field inputRecordField; + for (Schema.Field field: resultRecord.getSchema().getFields()) { + inputRecordField = inputRecord.getSchema().getField(field.name()); + if (inputRecordField != null) { + resultRecord.put(field.pos(), inputRecord.get(inputRecordField.pos())); + } + } + } + + /** + * @param oldValue of the record + * @param newValue of the record + * @param filterByFields to perform change filter on + * @return boolean to decide on the filter result. i.e. true is to keep and false is to be filtered . + */ + public static boolean changeFilter( + GenericRecord oldValue, + GenericRecord newValue, + List filterByFields, + String viewName) { + if (oldValue == null) { + return true; + } + if (newValue == null) { + throw new VeniceException("Cannot perform filter because new value is null for view: " + viewName); + } + boolean changed = false; + for (String fieldName: filterByFields) { + Schema fieldSchema = oldValue.getSchema().getField(fieldName).schema(); + if (GenericData.get().compare(oldValue.get(fieldName), newValue.get(fieldName), fieldSchema) != 0) { + changed = true; + break; + } + } + return changed; + } + + public static void configureWriterForProjection( + ComplexVeniceWriter complexVeniceWriter, + String viewName, + Lazy compressor, + String projectionSchemaString) { + Schema projectionSchema = new Schema.Parser().parse(projectionSchemaString); + Lazy> serializer = + Lazy.of(() -> FastSerializerDeserializerFactory.getFastAvroGenericSerializer(projectionSchema)); + complexVeniceWriter.configureWriterForProjection(projectionSchemaString, (projectionRecord) -> { + try { + return compressor.get().compress(serializer.get().serialize(projectionRecord)); + } catch (IOException e) { + throw new VeniceException("Projection failed due to compression error for view: " + viewName, e); + } + }); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java index f139b1fb531..a209368bc68 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java @@ -1,23 +1,23 @@ package com.linkedin.venice.writer; import com.linkedin.venice.exceptions.VeniceException; -import com.linkedin.venice.kafka.protocol.Put; -import com.linkedin.venice.kafka.protocol.enums.MessageType; -import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.partitioner.ComplexVenicePartitioner; import com.linkedin.venice.partitioner.VenicePartitioner; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; +import com.linkedin.venice.utils.AvroRecordUtils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.VeniceView; -import java.util.Set; +import com.linkedin.venice.views.ViewUtils; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -29,6 +29,8 @@ public class ComplexVeniceWriter extends VeniceWriter { private final ComplexVenicePartitioner complexPartitioner; private final String viewName; + private GenericRecord reusableProjectionRecord = null; + private Function projectionSerializerAndCompressor; public ComplexVeniceWriter( VeniceWriterOptions params, @@ -45,6 +47,14 @@ public ComplexVeniceWriter( VeniceView.getViewNameFromViewStoreName(VeniceView.parseStoreAndViewFromViewTopic(params.getTopicName())); } + public void configureWriterForProjection( + String projectionSchemaString, + Function projectionSerializerAndCompressor) { + Schema schema = new Schema.Parser().parse(projectionSchemaString); + this.reusableProjectionRecord = new GenericData.Record(schema); + this.projectionSerializerAndCompressor = projectionSerializerAndCompressor; + } + public CompletableFuture complexPut(K key, V value, int valueSchemaId, Lazy valueProvider) { return complexPut(key, value, valueSchemaId, valueProvider, null, null, null); } @@ -64,14 +74,26 @@ public CompletableFuture complexPut( PutMetadata putMetadata) { CompletableFuture finalCompletableFuture = new CompletableFuture<>(); if (value == null) { - // Ignore null value throw new VeniceException("Put value should not be null"); } else { + V finalValue; + if (reusableProjectionRecord != null) { + AvroRecordUtils.clearRecord(reusableProjectionRecord); + GenericRecord record = valueProvider.get(); + if (record == null) { + throw new VeniceException("Projection failed due to unexpected null value for view: " + getViewName()); + } + ViewUtils.project(record, reusableProjectionRecord); + finalValue = projectionSerializerAndCompressor.apply(reusableProjectionRecord); + } else { + // No projection + finalValue = value; + } // Write updated/put record to materialized view topic partition(s) byte[] serializedKey = keySerializer.serialize(topicName, key); if (complexPartitioner == null) { // No VeniceComplexPartitioner involved, perform simple put. - byte[] serializedValue = valueSerializer.serialize(topicName, value); + byte[] serializedValue = valueSerializer.serialize(topicName, finalValue); int partition = getPartition(serializedKey); propagateVeniceWriterFuture( put(serializedKey, serializedValue, partition, valueSchemaId, callback, putMetadata), @@ -84,7 +106,7 @@ public CompletableFuture complexPut( if (partitions.length == 0) { finalCompletableFuture.complete(null); } else { - byte[] serializedValue = valueSerializer.serialize(topicName, value); + byte[] serializedValue = valueSerializer.serialize(topicName, finalValue); performMultiPartitionAction( partitions, finalCompletableFuture, @@ -98,38 +120,6 @@ public CompletableFuture complexPut( return finalCompletableFuture; } - /** - * Used during NR pass-through in remote region to forward records or chunks of records to corresponding view - * partition based on provided view partition map. This way the producing leader don't need to worry about large - * record assembly or chunking for view topic(s) when ingesting from source VT during NR pass-through. It's also - * expected to receive an empty partition set and in which case it's a no-op and we simply return a completed future. - * This is a valid use case since certain complex partitioner implementation could filter out records based on value - * fields and return an empty partition. - */ - public CompletableFuture forwardPut(K key, V value, int valueSchemaId, Set partitions) { - if (partitions.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - byte[] serializedKey = keySerializer.serialize(topicName, key); - byte[] serializedValue = valueSerializer.serialize(topicName, value); - KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, serializedKey); - Put putPayload = buildPutPayload(serializedValue, valueSchemaId, null); - int[] partitionArray = partitions.stream().mapToInt(i -> i).toArray(); - CompletableFuture finalCompletableFuture = new CompletableFuture<>(); - performMultiPartitionAction( - partitionArray, - finalCompletableFuture, - (partition) -> sendMessage( - producerMetadata -> kafkaKey, - MessageType.PUT, - putPayload, - partition, - null, - DEFAULT_LEADER_METADATA_WRAPPER, - APP_DEFAULT_LOGICAL_TS)); - return finalCompletableFuture; - } - /** * Perform "delete" on the given key. If a {@link ComplexVenicePartitioner} is involved then it will be a best effort * attempt to delete the record using the valueProvider. It's best effort because: diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java index a8ec09d79af..49a8cc607f3 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java @@ -79,6 +79,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; import org.apache.samza.system.SystemProducer; import org.testng.Assert; @@ -224,13 +225,7 @@ public void testBatchOnlyMaterializedViewDVCConsumer() throws IOException, Execu TestWriteUtils.runPushJob("Run push job", props); // Start a DVC client that's subscribed to partition 0 of the store's materialized view. The DVC client should // contain all data records. - VeniceProperties backendConfig = - new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) - .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) - .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) - .put(ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES, 2 * 1024 * 1024L) - .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) - .build(); + VeniceProperties backendConfig = getBasicBackendConfigForDVC(); DaVinciConfig daVinciConfig = new DaVinciConfig(); // Use non-source fabric region to also verify NR for materialized view. D2Client daVinciD2RemoteFabric = D2TestUtils @@ -285,13 +280,7 @@ public void testBatchOnlyMaterializedViewDVCConsumer() throws IOException, Execu D2ClientUtils.shutdownClient(daVinciD2RemoteFabric); } // Make sure things work in the source fabric too. - VeniceProperties newBackendConfig = - new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) - .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) - .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) - .put(ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES, 2 * 1024 * 1024L) - .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) - .build(); + VeniceProperties newBackendConfig = getBasicBackendConfigForDVC(); D2Client daVinciD2SourceFabric = D2TestUtils .getAndStartD2Client(multiRegionMultiClusterWrapper.getChildRegions().get(1).getZkServerWrapper().getAddress()); try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( @@ -353,13 +342,7 @@ public void testLargeValuePushMaterializedViewDVCConsumer() TestWriteUtils.runPushJob("Run push job", props); // Start a DVC client that's subscribed to all partitions of the store's materialized view. The DVC client should // contain all data records. - VeniceProperties backendConfig = - new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) - .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) - .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) - .put(ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES, 2 * 1024 * 1024L) - .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) - .build(); + VeniceProperties backendConfig = getBasicBackendConfigForDVC(); DaVinciConfig daVinciConfig = new DaVinciConfig(); // Use non-source fabric region to also verify NR for materialized view and chunks forwarding. D2Client daVinciD2RemoteFabric = D2TestUtils @@ -452,15 +435,15 @@ public void testLargeValuePushMaterializedViewCCConsumer() polledChangeEvents.put(key, afterImageEvent); } Assert.assertEquals(polledChangeEvents.size(), numberOfRecords); + viewTopicConsumer.close(); } /** - * Verification of the produced records is difficult because we don't really support complex partitioner in the - * read path. Once CC with views is supported we should use CC to verify. Perform re-push to ensure we can deserialize - * value properly during re-push. + * Perform re-push to ensure we can deserialize value properly during re-push. */ @Test(timeOut = TEST_TIMEOUT) - public void testMaterializedViewWithComplexPartitioner() throws IOException { + public void testMaterializedViewWithComplexPartitioner() + throws IOException, ExecutionException, InterruptedException { File inputDir = getTempDataDirectory(); Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV2Schema(inputDir); String inputDirPath = "file:" + inputDir.getAbsolutePath(); @@ -501,18 +484,33 @@ public void testMaterializedViewWithComplexPartitioner() throws IOException { }); } TestWriteUtils.runPushJob("Run push job", props); - String viewTopicName = - Version.composeKafkaTopic(storeName, 1) + VIEW_NAME_SEPARATOR + testViewName + MATERIALIZED_VIEW_TOPIC_SUFFIX; - // View topic partitions should be mostly empty based on the TestValueBasedPartitioner logic. - int expectedMaxEndOffset = 6; // This may change when we introduce more CMs e.g. heartbeats - for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) { - VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin(); - PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName); - Int2LongMap viewTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(viewPubSubTopic); - for (long endOffset: viewTopicOffsetMap.values()) { - Assert.assertTrue(endOffset <= expectedMaxEndOffset); - } - } + + // View topic partitions should be empty based on the TestValueBasedPartitioner logic. Start a CC consumer in source + // region to verify. + Properties consumerProperties = new Properties(); + consumerProperties.put( + KAFKA_BOOTSTRAP_SERVERS, + multiRegionMultiClusterWrapper.getChildRegions().get(0).getPubSubBrokerWrapper().getAddress()); + ChangelogClientConfig viewChangeLogClientConfig = getChangelogClientConfigForView( + testViewName, + consumerProperties, + multiRegionMultiClusterWrapper.getChildRegions().get(0).getZkServerWrapper().getAddress()); + VeniceChangelogConsumerClientFactory veniceViewChangelogConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(viewChangeLogClientConfig, new MetricsRepository()); + VeniceChangelogConsumer viewTopicConsumer0 = + veniceViewChangelogConsumerClientFactory.getChangelogConsumer(storeName, "partition0"); + VeniceChangelogConsumer viewTopicConsumer1 = + veniceViewChangelogConsumerClientFactory.getChangelogConsumer(storeName, "partition1"); + Assert.assertTrue(viewTopicConsumer0 instanceof VeniceAfterImageConsumerImpl); + Assert.assertTrue(viewTopicConsumer1 instanceof VeniceAfterImageConsumerImpl); + viewTopicConsumer0.subscribe(Collections.singleton(0)).get(); + viewTopicConsumer1.subscribe(Collections.singleton(1)).get(); + Collection, VeniceChangeCoordinate>> pubSubMessages = + viewTopicConsumer0.poll(1000); + Assert.assertEquals(pubSubMessages.size(), 0); + pubSubMessages = viewTopicConsumer1.poll(1000); + Assert.assertEquals(pubSubMessages.size(), 0); + // Perform some partial updates in the non-NR source fabric VeniceClusterWrapper veniceCluster = childDatacenters.get(1).getClusters().get(clusterName); SystemProducer producer = @@ -526,25 +524,287 @@ public void testMaterializedViewWithComplexPartitioner() throws IOException { updateBuilder.setNewFieldValue("age", i); IntegrationTestPushUtils.sendStreamingRecord(producer, storeName, key, updateBuilder.build(), newTimestamp); } + // age 1-9 will be written to all partitions so +9 in p0 and p1 // age 10-19 will be written to % numPartitions which will alternate so +5 in p0 and p1 - int newMinEndOffset = expectedMaxEndOffset + 9 + 5; - for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) { - VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin(); - PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName); - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { - Int2LongMap viewTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(viewPubSubTopic); - for (long endOffset: viewTopicOffsetMap.values()) { - Assert.assertTrue(endOffset >= newMinEndOffset); - } - }); + int expectedEventCount = 9 + 5; + Map polledEvents0 = new HashMap<>(); + Map polledEvents1 = new HashMap<>(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + for (PubSubMessage, VeniceChangeCoordinate> pubSubMessage: viewTopicConsumer0 + .poll(1000)) { + polledEvents0.put(pubSubMessage.getKey().toString(), pubSubMessage.getValue().getCurrentValue()); + } + for (PubSubMessage, VeniceChangeCoordinate> pubSubMessage: viewTopicConsumer1 + .poll(1000)) { + polledEvents1.put(pubSubMessage.getKey().toString(), pubSubMessage.getValue().getCurrentValue()); + } + Assert.assertEquals(polledEvents0.size(), expectedEventCount); + Assert.assertEquals(polledEvents1.size(), expectedEventCount); + }); + for (int i = 1; i < 10; i++) { + // These records should exist in both partitions + Assert.assertTrue(polledEvents0.containsKey(Integer.toString(i))); + Assert.assertTrue(polledEvents1.containsKey(Integer.toString(i))); } + for (int i = 10; i < 20; i++) { + // Alternate between p0 and p1 + if (i % 2 == 0) { + Assert.assertTrue(polledEvents0.containsKey(Integer.toString(i))); + } else { + Assert.assertTrue(polledEvents1.containsKey(Integer.toString(i))); + } + } + // A re-push should succeed Properties rePushProps = TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); rePushProps.setProperty(SOURCE_KAFKA, "true"); rePushProps.setProperty(KAFKA_INPUT_BROKER_URL, childDatacenters.get(0).getPubSubBrokerWrapper().getAddress()); TestWriteUtils.runPushJob("Run push job", rePushProps); + + viewTopicConsumer0.close(); + viewTopicConsumer1.close(); + } + + /** + * Test materialized view with projection using change log and DVC consumer. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testMaterializedViewWithProjectionUsingCCAndDVCConsumer() + throws IOException, ExecutionException, InterruptedException { + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV2Schema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("complexPartitionStore"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(false) + .setChunkingEnabled(true) + .setCompressionStrategy(CompressionStrategy.GZIP) + .setRmdChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setNativeReplicationSourceFabric(childDatacenters.get(0).getRegionName()) + .setPartitionCount(3) + .setActiveActiveReplicationEnabled(true) + .setWriteComputationEnabled(true) + .setHybridRewindSeconds(10L) + .setHybridOffsetLagThreshold(2L); + String testViewName = "projectionView"; + String projectionFieldName = "firstName"; + try (ControllerClient controllerClient = + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { + MaterializedViewParameters.Builder viewParamBuilder = + new MaterializedViewParameters.Builder(testViewName).setPartitionCount(1) + .setProjectionFields(Collections.singletonList(projectionFieldName)); + UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams(viewParamBuilder.build()); + controllerClient + .retryableRequest(5, controllerClient1 -> controllerClient.updateStore(storeName, updateViewParam)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, () -> { + Map viewConfigMap = controllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertEquals( + viewConfigMap.get(testViewName).getViewClassName(), + MaterializedView.class.getCanonicalName()); + }); + } + TestWriteUtils.runPushJob("Run push job", props); + + // Verify projection with change log consumer in all regions + String firstNamePrefix = "first_name_"; + for (VeniceMultiClusterWrapper region: multiRegionMultiClusterWrapper.getChildRegions()) { + Map polledEvents = getChangeLogConsumerAndPollEvents( + storeName, + testViewName, + region.getPubSubBrokerWrapper().getAddress(), + region.getZkServerWrapper().getAddress(), + 100); + // Check the projection result + for (int i = 1; i <= 100; i++) { + GenericRecord record = polledEvents.get(Integer.toString(i)); + Assert.assertNotNull(record); + Assert.assertEquals(record.get(projectionFieldName).toString(), firstNamePrefix + i); + Assert.assertEquals(record.getSchema().getFields().size(), 1); + } + } + + // TODO investigate DuplicateDataException in internalProcessConsumerRecord causing 2/3 of the keys to be missing + // Verify projection with DVC in all regions + /*for (VeniceMultiClusterWrapper region : multiRegionMultiClusterWrapper.getChildRegions()) { + VeniceProperties backendConfig = getBasicBackendConfigForDVC(); + DaVinciConfig daVinciConfig = new DaVinciConfig(); + D2Client daVinciD2RemoteFabric = D2TestUtils.getAndStartD2Client(region.getZkServerWrapper().getAddress()); + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + daVinciD2RemoteFabric, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + new MetricsRepository(), + backendConfig)) { + DaVinciClient viewClient = + factory.getAndStartGenericAvroClient(storeName, testViewName, daVinciConfig); + viewClient.subscribeAll().get(); + for (int i = 1; i <= 100; i++) { + GenericRecord record = (GenericRecord) viewClient.get(Integer.toString(i)).get(); + Assert.assertNotNull(record); + Assert.assertEquals(record.get(projectionFieldName).toString(), firstNamePrefix + i); + Assert.assertEquals(record.getSchema().getFields().size(), 1); + } + } finally { + D2ClientUtils.shutdownClient(daVinciD2RemoteFabric); + } + }*/ + } + + @Test(timeOut = TEST_TIMEOUT) + public void testMaterializedViewWithFilterAndProjectionUsingCCConsumer() + throws IOException, ExecutionException, InterruptedException { + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV2Schema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("complexPartitionStore"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + // Use an A/A W/C enabled store to verify correct partitioning after partial update is applied. + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(false) + .setChunkingEnabled(true) + .setCompressionStrategy(CompressionStrategy.GZIP) + .setRmdChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setNativeReplicationSourceFabric(childDatacenters.get(0).getRegionName()) + .setPartitionCount(3) + .setActiveActiveReplicationEnabled(true) + .setWriteComputationEnabled(true) + .setHybridRewindSeconds(10L) + .setHybridOffsetLagThreshold(2L); + String testViewName = "filterProjectionView"; + String projectionFieldName = "firstName"; + String filterByFieldName = "age"; + try (ControllerClient controllerClient = + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { + MaterializedViewParameters.Builder viewParamBuilder = + new MaterializedViewParameters.Builder(testViewName).setPartitionCount(1) + .setProjectionFields(Collections.singletonList(projectionFieldName)) + .setFilterByFields(Collections.singletonList(filterByFieldName)); + UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams(viewParamBuilder.build()); + controllerClient + .retryableRequest(5, controllerClient1 -> controllerClient.updateStore(storeName, updateViewParam)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, () -> { + Map viewConfigMap = controllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertEquals( + viewConfigMap.get(testViewName).getViewClassName(), + MaterializedView.class.getCanonicalName()); + }); + } + TestWriteUtils.runPushJob("Run push job", props); + // Verify result with change log consumer in all regions + String firstNamePrefix = "first_name_"; + Map> consumerMap = new HashMap<>(); + for (VeniceMultiClusterWrapper region: multiRegionMultiClusterWrapper.getChildRegions()) { + VeniceChangelogConsumer consumer = getChangeLogConsumer( + storeName, + testViewName, + region.getPubSubBrokerWrapper().getAddress(), + region.getZkServerWrapper().getAddress()); + consumer.subscribeAll().get(); + consumerMap.put(region.getRegionName(), consumer); + Map polledEvents = pollEvents(consumer, 100); + // Check the projection and filter results. Nothing should be filtered from the batch push. + for (int i = 1; i <= 100; i++) { + GenericRecord record = polledEvents.get(Integer.toString(i)); + Assert.assertNotNull(record); + Assert.assertEquals(record.getSchema().getFields().size(), 2); + Assert.assertEquals(record.get(projectionFieldName).toString(), firstNamePrefix + i); + Assert.assertEquals(record.get(filterByFieldName), -1); + } + } + + // Perform some partial updates in the non-NR source fabric + String updatedFirstNamePrefix = "updated_first_name_"; + VeniceClusterWrapper veniceCluster = childDatacenters.get(1).getClusters().get(clusterName); + SystemProducer producer = + IntegrationTestPushUtils.getSamzaProducer(veniceCluster, storeName, Version.PushType.STREAM); + Schema partialUpdateSchema = WriteComputeSchemaConverter.getInstance() + .convertFromValueRecordSchema(recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema()); + long newTimestamp = 100000L; + int expectedFilteredEvents = 0; + for (int i = 1; i < 20; i++) { + String key = Integer.toString(i); + UpdateBuilder updateBuilder = new UpdateBuilderImpl(partialUpdateSchema); + if (i % 2 == 0) { + // only update age for some keys + updateBuilder.setNewFieldValue(filterByFieldName, i); + expectedFilteredEvents++; + } + updateBuilder.setNewFieldValue(projectionFieldName, updatedFirstNamePrefix + i); + IntegrationTestPushUtils.sendStreamingRecord(producer, storeName, key, updateBuilder.build(), newTimestamp); + } + for (VeniceChangelogConsumer consumer: consumerMap.values()) { + Map polledEvents = pollEvents(consumer, expectedFilteredEvents); + for (int i = 1; i < 20; i++) { + if (i % 2 == 0) { + GenericRecord record = polledEvents.get(Integer.toString(i)); + Assert.assertNotNull(record); + Assert.assertEquals(record.getSchema().getFields().size(), 2); + Assert.assertEquals(record.get(projectionFieldName).toString(), updatedFirstNamePrefix + i); + Assert.assertEquals(record.get(filterByFieldName), i); + } + } + consumer.close(); + } + } + + private Map getChangeLogConsumerAndPollEvents( + String storeName, + String viewName, + String pubSubBrokerAddress, + String d2ZkAddress, + int expectedEventsCount) throws ExecutionException, InterruptedException { + VeniceChangelogConsumer viewTopicConsumer = null; + try { + viewTopicConsumer = getChangeLogConsumer(storeName, viewName, pubSubBrokerAddress, d2ZkAddress); + viewTopicConsumer.subscribeAll().get(); + return pollEvents(viewTopicConsumer, expectedEventsCount); + } finally { + if (viewTopicConsumer != null) { + viewTopicConsumer.close(); + } + } + } + + private VeniceChangelogConsumer getChangeLogConsumer( + String storeName, + String viewName, + String pubSubBrokerAddress, + String d2ZkAddress) { + Properties consumerProperties = new Properties(); + consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, pubSubBrokerAddress); + ChangelogClientConfig viewChangeLogClientConfig = + getChangelogClientConfigForView(viewName, consumerProperties, d2ZkAddress); + VeniceChangelogConsumerClientFactory veniceViewChangelogConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(viewChangeLogClientConfig, new MetricsRepository()); + return veniceViewChangelogConsumerClientFactory.getChangelogConsumer(storeName); + } + + private Map pollEvents( + VeniceChangelogConsumer viewTopicConsumer, + int expectedEventsCount) { + Map polledEvents = new HashMap<>(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + for (PubSubMessage, VeniceChangeCoordinate> pubSubMessage: viewTopicConsumer + .poll(1000)) { + polledEvents.put(pubSubMessage.getKey().toString(), pubSubMessage.getValue().getCurrentValue()); + } + Assert.assertEquals(polledEvents.size(), expectedEventsCount); + }); + return polledEvents; } private double getMetric(MetricsRepository metricsRepository, String metricName, String storeName) { @@ -584,4 +844,27 @@ private void validateViewTopicAndVersionTopic( Assert.assertTrue(records > minRecordCount, "View topic records size: " + records); } } + + private ChangelogClientConfig getChangelogClientConfigForView( + String viewName, + Properties consumerProperties, + String localD2ZkHost) { + return new ChangelogClientConfig().setViewName(viewName) + .setConsumerProperties(consumerProperties) + .setControllerD2ServiceName(D2_SERVICE_NAME) + .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .setLocalD2ZkHosts(localD2ZkHost) + .setVersionSwapDetectionIntervalTimeInSeconds(3L) + .setControllerRequestRetryCount(3) + .setBootstrapFileSystemPath(getTempDataDirectory().getAbsolutePath()); + } + + private VeniceProperties getBasicBackendConfigForDVC() { + return new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) + .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) + .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES, 2 * 1024 * 1024L) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .build(); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java index b6e18d2e93b..0c91d9c35b6 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java @@ -13,7 +13,6 @@ import com.linkedin.venice.utils.lazy.Lazy; import java.nio.ByteBuffer; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -40,7 +39,8 @@ public CompletableFuture processRecord( int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord, - Lazy valueProvider) { + Lazy valueProvider, + Lazy oldValueProvider) { internalView.incrementRecordCount(storeName); return CompletableFuture.completedFuture(null); @@ -51,8 +51,8 @@ public CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - Set viewPartitionSet, - Lazy newValueProvider) { + Lazy newValueProvider, + Lazy oldValueProvider) { internalView.incrementRecordCount(storeName); return CompletableFuture.completedFuture(null); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 29a7462c922..cfe8fc50ba7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -5765,20 +5765,23 @@ static Map mergeNewViewConfigsIntoOldConfigs( static Map addNewViewConfigsIntoOldConfigs( Store oldStore, - String viewClass, + String viewName, ViewConfig viewConfig) throws VeniceException { - // Add new view config into the existing config map. The new configs will override existing ones which share the - // same key. + // Add new view config into the existing config map. For the safety of downstream consumers we do not allow in place + // updates to an existing view. Updates should be made by creating a new view and removing the old view. Map oldViewConfigMap = oldStore.getViewConfigs(); if (oldViewConfigMap == null) { oldViewConfigMap = new HashMap<>(); + } else if (oldViewConfigMap.containsKey(viewName)) { + throw new VeniceException( + "We do not support in place update of view configs for the safety of downstream consumers"); } Map mergedConfigs = StoreViewUtils.convertViewConfigMapToStoreViewRecordMap(oldViewConfigMap); StoreViewConfigRecord newStoreViewConfigRecord = StoreViewUtils.convertViewConfigToStoreViewConfigRecord(viewConfig); - mergedConfigs.put(viewClass, newStoreViewConfigRecord); + mergedConfigs.put(viewName, newStoreViewConfigRecord); return mergedConfigs; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 59cd1f22492..e9f4e424d3c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -191,6 +191,7 @@ import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.PartitionerConfig; +import com.linkedin.venice.meta.ReadWriteSchemaRepository; import com.linkedin.venice.meta.ReadWriteStoreRepository; import com.linkedin.venice.meta.RegionPushDetails; import com.linkedin.venice.meta.RoutersClusterConfig; @@ -244,6 +245,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; @@ -2423,7 +2425,8 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa } // If View parameter is not provided, use emtpy map instead. It does not inherit from existing config. ViewConfig viewConfig = new ViewConfigImpl(viewClassName.get(), viewParams.orElse(Collections.emptyMap())); - ViewConfig validatedViewConfig = validateAndDecorateStoreViewConfig(currStore, viewConfig, viewName.get()); + ViewConfig validatedViewConfig = + validateAndDecorateStoreViewConfig(clusterName, currStore, viewConfig, viewName.get()); updatedViewSettings = VeniceHelixAdmin.addNewViewConfigsIntoOldConfigs(currStore, viewName.get(), validatedViewConfig); } else { @@ -2436,7 +2439,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa if (storeViewConfig.isPresent()) { // Validate and overwrite store views if they're getting set Map validatedViewConfigs = - validateAndDecorateStoreViewConfigs(storeViewConfig.get(), currStore); + validateAndDecorateStoreViewConfigs(clusterName, storeViewConfig.get(), currStore); setStore.views = StoreViewUtils.convertViewConfigMapToStoreViewRecordMap(validatedViewConfigs); updatedConfigsList.add(STORE_VIEW); } @@ -2911,18 +2914,25 @@ && getVeniceHelixAdmin().isHybrid(setStore.getHybridStoreConfig()) && setStore.g } } - private Map validateAndDecorateStoreViewConfigs(Map stringMap, Store store) { + private Map validateAndDecorateStoreViewConfigs( + String clusterName, + Map stringMap, + Store store) { Map configs = StoreViewUtils.convertStringMapViewToViewConfigMap(stringMap); Map validatedConfigs = new HashMap<>(); for (Map.Entry viewConfigEntry: configs.entrySet()) { ViewConfig validatedViewConfig = - validateAndDecorateStoreViewConfig(store, viewConfigEntry.getValue(), viewConfigEntry.getKey()); + validateAndDecorateStoreViewConfig(clusterName, store, viewConfigEntry.getValue(), viewConfigEntry.getKey()); validatedConfigs.put(viewConfigEntry.getKey(), validatedViewConfig); } return validatedConfigs; } - private ViewConfig validateAndDecorateStoreViewConfig(Store store, ViewConfig viewConfig, String viewName) { + private ViewConfig validateAndDecorateStoreViewConfig( + String clusterName, + Store store, + ViewConfig viewConfig, + String viewName) { // TODO: Pass a proper properties object here. Today this isn't used in this context if (viewConfig.getViewClassName().equals(MaterializedView.class.getCanonicalName())) { if (viewName.contains(VERSION_SEPARATOR)) { @@ -2944,7 +2954,46 @@ private ViewConfig validateAndDecorateStoreViewConfig(Store store, ViewConfig vi if (!viewParams.containsKey(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())) { decoratedViewParamBuilder.setPartitionCount(store.getPartitionCount()); } - viewConfig.setViewParameters(decoratedViewParamBuilder.build()); + viewParams = decoratedViewParamBuilder.build(); + + // Validate and decorate filter by and projection fields + Lazy valueSchema = Lazy.of(() -> getSupersetOrLatestValueSchema(clusterName, store.getName())); + Set filterByFields = new HashSet<>(); + Set projectionFields = new HashSet<>(); + if (viewParams.containsKey(MaterializedViewParameters.MATERIALIZED_VIEW_FILTER_BY_FIELDS.name())) { + String filterByFieldsString = + viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_FILTER_BY_FIELDS.name()); + filterByFields.addAll( + MaterializedViewParameters.parsePropertyStringToList( + filterByFieldsString, + MaterializedViewParameters.MATERIALIZED_VIEW_FILTER_BY_FIELDS.name())); + ViewUtils.validateFilterByFields(valueSchema.get(), filterByFields); + } + if (viewParams.containsKey(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name())) { + String projectionFieldsString = + viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name()); + projectionFields.addAll( + MaterializedViewParameters.parsePropertyStringToList( + projectionFieldsString, + MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name())); + if (projectionFields.isEmpty()) { + // Enabling projection with empty projection fields should be flagged as error. + throw new VeniceException("Projection fields cannot be set to an empty list"); + } + // If projection and filtering are both enabled we would like to include the filtering fields as part of + // the projection fields. + if (!filterByFields.isEmpty()) { + projectionFields.addAll(filterByFields); + } + } + if (!projectionFields.isEmpty()) { + String projectionSchemaString = ViewUtils.validateAndGenerateProjectionSchema( + valueSchema.get(), + projectionFields, + VeniceView.getProjectionSchemaName(store.getName(), viewName)); + viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_SCHEMA.name(), projectionSchemaString); + } + viewConfig.setViewParameters(viewParams); } VeniceView view = ViewUtils.getVeniceView( viewConfig.getViewClassName(), @@ -2955,6 +3004,12 @@ private ViewConfig validateAndDecorateStoreViewConfig(Store store, ViewConfig vi return viewConfig; } + private Schema getSupersetOrLatestValueSchema(String clusterName, String storeName) { + getVeniceHelixAdmin().checkControllerLeadershipFor(clusterName); + ReadWriteSchemaRepository schemaRepo = getHelixVeniceClusterResources(clusterName).getSchemaRepository(); + return schemaRepo.getSupersetOrLatestValueSchema(storeName).getSchema(); + } + private SupersetSchemaGenerator getSupersetSchemaGenerator(String clusterName) { if (externalSupersetSchemaGenerator.isPresent() && getMultiClusterConfigs().getControllerConfig(clusterName) .isParentExternalSupersetSchemaGenerationEnabled()) { From e8447e0d5602ab7a790267c385318883af0867cd Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Tue, 25 Mar 2025 14:06:36 -0700 Subject: [PATCH 2/2] Fix DVC missing key due to DuplicateDataException and added more integration tests --- .../datawriter/jobs/DataWriterMRJob.java | 5 +- .../datawriter/AbstractPartitionWriter.java | 12 ++- .../TestMaterializedViewEndToEnd.java | 101 +++++++++++++++++- 3 files changed, 109 insertions(+), 9 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java index 7a74eb2d981..34722db610c 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java @@ -310,8 +310,11 @@ private void setupReducerConf(JobConf jobConf, PushJobSetting pushJobSetting) { jobConf.set(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.materializedViewConfigFlatMap); jobConf.set(VALUE_SCHEMA_DIR, pushJobSetting.valueSchemaDir); jobConf.set(RMD_SCHEMA_DIR, pushJobSetting.rmdSchemaDir); + // Disable speculative execution regardless of user config if materialized view is present + jobConf.setReduceSpeculativeExecution(false); + } else { + jobConf.setReduceSpeculativeExecution(vpjProperties.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false)); } - jobConf.setReduceSpeculativeExecution(vpjProperties.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false)); int partitionCount = pushJobSetting.partitionCount; jobConf.setInt(PARTITION_COUNT, partitionCount); jobConf.setNumReduceTasks(partitionCount); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java index 6247863bc55..684859be152 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java @@ -615,10 +615,14 @@ protected void configureTask(VeniceProperties props) { writerProps.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, -1); EngineTaskConfigProvider engineTaskConfigProvider = getEngineTaskConfigProvider(); Properties jobProps = engineTaskConfigProvider.getJobProps(); - // Use the UUID bits created by the VPJ driver to build a producerGUID deterministically - writerProps.put(GuidUtils.GUID_GENERATOR_IMPLEMENTATION, GuidUtils.DETERMINISTIC_GUID_GENERATOR_IMPLEMENTATION); - writerProps.put(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS)); - writerProps.put(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS)); + // If materialized views are present we will disable speculative execution and use default GUID generator + if (this.props.getString(PUSH_JOB_VIEW_CONFIGS, "").isEmpty()) { + // Use the UUID bits created by the VPJ driver to build a producerGUID deterministically + writerProps.put(GuidUtils.GUID_GENERATOR_IMPLEMENTATION, GuidUtils.DETERMINISTIC_GUID_GENERATOR_IMPLEMENTATION); + writerProps.put(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS)); + writerProps + .put(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS)); + } return new VeniceWriterFactory(writerProps); }); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java index 49a8cc607f3..7cd230e7cfc 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java @@ -37,6 +37,7 @@ import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.integration.utils.D2TestUtils; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; @@ -74,6 +75,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -179,11 +181,15 @@ public void testLFIngestionWithMaterializedView() throws IOException { TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); rePushProps.setProperty(SOURCE_KAFKA, "true"); rePushProps.setProperty(KAFKA_INPUT_BROKER_URL, childDatacenters.get(0).getPubSubBrokerWrapper().getAddress()); - TestWriteUtils.runPushJob("Run push job", rePushProps); + TestWriteUtils.runPushJob("Run re-push job", rePushProps); String rePushViewTopicName = Version.composeKafkaTopic(storeName, 2) + VIEW_NAME_SEPARATOR + testViewName + MATERIALIZED_VIEW_TOPIC_SUFFIX; String rePushVersionTopicName = Version.composeKafkaTopic(storeName, 2); validateViewTopicAndVersionTopic(rePushViewTopicName, rePushVersionTopicName, 6, 3, 100); + + // Perform another push v3 to ensure view resources for v1 are cleaned up + TestWriteUtils.runPushJob("Run push job v3", props); + verifyVersionAndViewTopicDeletion(viewTopicName, versionTopicName); } } @@ -632,9 +638,8 @@ public void testMaterializedViewWithProjectionUsingCCAndDVCConsumer() } } - // TODO investigate DuplicateDataException in internalProcessConsumerRecord causing 2/3 of the keys to be missing // Verify projection with DVC in all regions - /*for (VeniceMultiClusterWrapper region : multiRegionMultiClusterWrapper.getChildRegions()) { + for (VeniceMultiClusterWrapper region: multiRegionMultiClusterWrapper.getChildRegions()) { VeniceProperties backendConfig = getBasicBackendConfigForDVC(); DaVinciConfig daVinciConfig = new DaVinciConfig(); D2Client daVinciD2RemoteFabric = D2TestUtils.getAndStartD2Client(region.getZkServerWrapper().getAddress()); @@ -655,7 +660,7 @@ public void testMaterializedViewWithProjectionUsingCCAndDVCConsumer() } finally { D2ClientUtils.shutdownClient(daVinciD2RemoteFabric); } - }*/ + } } @Test(timeOut = TEST_TIMEOUT) @@ -761,6 +766,75 @@ public void testMaterializedViewWithFilterAndProjectionUsingCCConsumer() } } + @Test(timeOut = TEST_TIMEOUT) + public void testFailedMaterializedViewPushCanBeCleanedUp() throws IOException { + // Create a store with materialized view and start a batch push + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("batchStore"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(false) + .setChunkingEnabled(true) + .setRmdChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setNativeReplicationSourceFabric(childDatacenters.get(0).getRegionName()) + .setPartitionCount(3); + String testViewName = "MaterializedViewTest"; + try (ControllerClient controllerClient = + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { + MaterializedViewParameters.Builder viewParamBuilder = + new MaterializedViewParameters.Builder(testViewName).setPartitionCount(1); + UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams(viewParamBuilder.build()); + controllerClient + .retryableRequest(5, controllerClient1 -> controllerClient.updateStore(storeName, updateViewParam)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, () -> { + Map viewConfigMap = controllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertEquals( + viewConfigMap.get(testViewName).getViewClassName(), + MaterializedView.class.getCanonicalName()); + }); + VersionCreationResponse versionCreationResponse = controllerClient.requestTopicForWrites( + storeName, + 1000, + Version.PushType.BATCH, + "controller-client-started-test-push", + true, + true, + false, + Optional.empty(), + Optional.empty(), + Optional.empty(), + false, + -1); + Assert.assertFalse(versionCreationResponse.isError()); + int newVersion = versionCreationResponse.getVersion(); + String versionTopicName = Version.composeKafkaTopic(storeName, newVersion); + String viewTopicName = Version.composeKafkaTopic(storeName, newVersion) + VIEW_NAME_SEPARATOR + testViewName + + MATERIALIZED_VIEW_TOPIC_SUFFIX; + // Wait and verify the resources are created + for (VeniceMultiClusterWrapper veniceMultiClusterWrapper: childDatacenters) { + VeniceHelixAdmin admin = veniceMultiClusterWrapper.getRandomController().getVeniceHelixAdmin(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + Assert.assertTrue(admin.getStore(clusterName, storeName).containsVersion(newVersion)); + Assert.assertTrue( + admin.getTopicManager().containsTopic(admin.getPubSubTopicRepository().getTopic(versionTopicName))); + Assert.assertTrue( + admin.getTopicManager().containsTopic(admin.getPubSubTopicRepository().getTopic(viewTopicName))); + }); + } + // Kill the push job and ensure resources are cleaned up + controllerClient.killOfflinePushJob(versionTopicName); + verifyVersionAndViewTopicDeletion(viewTopicName, versionTopicName); + } + } + private Map getChangeLogConsumerAndPollEvents( String storeName, String viewName, @@ -845,6 +919,25 @@ private void validateViewTopicAndVersionTopic( } } + private void verifyVersionAndViewTopicDeletion(String viewTopicName, String versionTopicName) { + String storeName = Version.parseStoreFromVersionTopic(versionTopicName); + int versionNumber = Version.parseVersionFromVersionTopicName(versionTopicName); + for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) { + VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + Assert.assertFalse(admin.getStore(clusterName, storeName).containsVersion(versionNumber)); + }); + PubSubTopic versionPubSubTopic = admin.getPubSubTopicRepository().getTopic(versionTopicName); + if (admin.getTopicManager().containsTopic(versionPubSubTopic)) { + Assert.assertTrue(admin.isTopicTruncated(versionTopicName)); + } + PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName); + if (admin.getTopicManager().containsTopic(viewPubSubTopic)) { + Assert.assertTrue(admin.isTopicTruncated(viewTopicName)); + } + } + } + private ChangelogClientConfig getChangelogClientConfigForView( String viewName, Properties consumerProperties,