Skip to content

[server][common][controller][vpj] Materialized view projection and filter support #1647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

xunyin8
Copy link
Contributor

@xunyin8 xunyin8 commented Mar 24, 2025

Problem Statement

Consumers of a materialized view might not be interested in every data field and update event. Having the ability to perform projection and filter could reduce footprint and improve ingestion performance for MV consumers.

Solution

Add projection and filtering support for materialized view (MV) to be more efficient about unwanted data for view consumers. Projection can be enabled by setting projection fields in the materialized view parameters. Similarly filtering can be enabled by setting filter by fields. These two features can be enabled separately or together. If enabled together the filter by fields will be included in the projecting fields automatically. Here is an example MV configuration to illustrate the ideas:

Record containing fields: {a, b, c, d, e}
Projecting fields: {b, c}
Filtering fields: {a}

The only filtering option for now is to skip if none of the filter by fields changed. The filtering is also only applied during hybrid ingestion since it doesn't make sense to have a change filter on batch push. With the above setup we will project and write all batch data to the MV ({a, b, c}). RT updates (full PUT or UPDATE) will project and write the resulting record to the MV ({a, b, c}) only if the value of field (a) is different from the old value. All DELETE events will be written to the MV (no filtering).

Code changes

In order to achieve the above behavior there are several changes:

  1. Previously we've used pub sub message headers to perform forwarding to handle chunks during NR pass-through in remote regions. This strategy will not work with projection because in order for us to perform projection on batch data in remote regions, we will need the remote partition leaders to assemble the chunks during NR pass-through. We are replacing the forwarding strategy with InMemoryChunkAssembler. To ensure leaders don't resume in-between chunks we will also buffer and delay writing the chunks to drainer until we have a fully assembled record and produced it to view topic(s). The view partition header code is left untouched in VPJ to remove deployment or rollback order requirements. i.e. VPJ can get ahead of the server. If server gets ahead of VPJ that's fine too because server's new chunking support can function on its own. We can clean up everything once everything is deployed and stable (no more rollbacks).

  2. Added enforcement in controller to ensure view configs are immutable. Projection schema is generated when adding a new materialized view and stored with the view config. Since there can only be one schema version per view, the znode size should be manageable with compression. If this becomes a concern we can also store it separately or generate it on the fly. We also verify the filtering by fields and projection fields to ensure they exist in latest superset or value schema and have default values.

  3. Projection is performed in ComplexVeniceWriter as part of complexPut so both VPJ and leaders can use the same code for projection. Filtering is performed in MaterializedViewWriter since current offering of change filter is applicable only to hyrbid writes.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

Integration tests and will add unit tests once have some consensus on the changes.

  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

…lter support

Add projection and filtering support for materialized view (MV) to be more efficient about unwanted data for view
consumers. Projection can be enabled by setting projection fields in the materialized view parameters. Similarly
filtering can be enabled by setting filter by fields. These two features can be enabled separately or together.
If enabled together the filter by fields will be included in the projecting fields automatically. Here is an example
MV configuration to illustrate the ideas:

Record containing fields: {a, b, c, d, e}
Projecting fields: {b, c}
Filtering fields: {a}

The only filtering option for now is to skip if none of the filter by fields changed. The filtering is also only applied
during hybrid ingestion since it doesn't make sense to have a change filter on batch push. With the above setup we will
project and write all batch data to the MV ({a, b, c}). RT updates (full PUT or UPDATE) will project and write the
resulting record to the MV ({a, b, c}) only if the value of field (a) is different from the old value. All DELETE events
will be written to the MV (no filtering).

In order to achieve the above behavior there are several changes:

1. Previously we've used pub sub message headers to perform forwarding to handle chunks during NR pass-through in remote
regions. This strategy will not work with projection because in order for us to perform projection on batch data in
remote regions, we will need the remote partition leaders to assemble the chunks during NR pass-through. We are
replacing the forwarding strategy with InMemoryChunkAssembler. To ensure leaders don't resume in-between chunks we will
also buffer and delay writing the chunks to drainer until we have a fully assembled record and produced it to view
topic(s).

2. Added enforcement in controller to ensure view configs are immutable. Projection schema is generated when adding a
new materialized view and stored with the view config. Since there can only be one schema version per view, the znode
size should be manageable with compression. If this becomes a concern we can also store it separately or generate it on
the fly. We also verify the filtering by fields and projection fields to ensure they exist in latest superset or value
schema and have default values.

3. Projection is performed in ComplexVeniceWriter as part of complexPut so both VPJ and leaders can use the same code
for projection. Filtering is performed in MaterializedViewWriter since current offering of change filter is applicable
only to hyrbid writes.
@xunyin8 xunyin8 force-pushed the mv-projection-support branch from 7098b0f to 4eb2f54 Compare March 24, 2025 17:41
for (Map.Entry<String, VeniceViewWriter> viewWriter: viewWriters.entrySet()) {
if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) {
if (viewWriter.getValue().getViewWriterType() == VeniceViewWriter.ViewWriterType.CHANGE_CAPTURE_VIEW) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit,
We can remove one call of viewWriter.getValue().getViewWriterType() if we do it before the if/else.

@@ -3179,9 +3194,19 @@ private PubSubMessageProcessedResult processMessage(
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
byte[] keyBytes = kafkaKey.getKey();
MessageType msgType = MessageType.valueOf(kafkaValue.messageType);
Lazy<GenericRecord> valueProvider;
Lazy<GenericRecord> valueProvider = Lazy.of(() -> null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make a static final variable in Lazy class for null Lazy: Lazy.of(() -> null)?


if (hasFilterByFieldsMaterializedView) {
// Copy the currValue since it will be used for in-place update(s).
GenericRecord oldValue = new GenericData.Record((GenericData.Record) currValue, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You meant the filtering logic can modify the current value, right?
Is it guaranteed that it will always modify the current value, if not, I think we can do a lazy deepcopy.

if (hasViewWriters()) {
Put newPut = writeComputeResultWrapper.getNewPut();
Map<String, Set<Integer>> viewPartitionMap = null;
consumerRecordWrapper.setProcessedResult(new PubSubMessageProcessedResult(writeComputeResultWrapper));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to setup ProcessedResult here?

// NR pass-through records are expected to carry view partition map in the message header
viewPartitionMap = ViewUtils.extractViewPartitionMap(consumerRecord.getPubSubMessageHeaders());
// Native replication (NR) pass-through mode
partitionConsumptionState.addMessageToPendingMessages(consumerRecordWrapper);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to buffer it for non-projection/filtering views, right?

@@ -46,11 +47,24 @@ public MaterializedViewWriter(
new MaterializedView(props.getCombinedProperties().toProperties(), version.getStoreName(), extraViewParameters);
materializedViewTopicName =
internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get();
this.veniceWriter = Lazy.of(
veniceWriter = Lazy.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us try to make veniceWriter final and for testing purpose, we can offer another constructor to let the user of class to pass a VeniceWriter instead of exposing a setter.

writerProps.put(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS));
writerProps.put(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS));
// If materialized views are present we will disable speculative execution and use default GUID generator
if (this.props.getString(PUSH_JOB_VIEW_CONFIGS, "").isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it cause any issues if we always use deterministic GUID generator?

return viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_SCHEMA.name());
}

public boolean isValueProviderNeeded() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For view with only filtering rules, we also need the ValueProvider, right?

for (Schema.Field field: resultRecord.getSchema().getFields()) {
inputRecordField = inputRecord.getSchema().getField(field.name());
if (inputRecordField != null) {
resultRecord.put(field.pos(), inputRecord.get(inputRecordField.pos()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not work.
Assume:

1. Schema1 has two fields: f1, f2 and f1 has two sub fields: f11, f12.
2. Schema2 has two fields: f1, and f1 has three sub fields: f11, f12, f13 and all of them have a default value.
3. Schema1 the schema being used by `inputRecord`.
4. Schema2 is being used by `resultRecord`.

When the logic here, the resultRecord will carry f1 with f11, f12, but schema of f1 contains f11, f12 and f13, so the serialization of resultRecord will fail.
We should always keep in mind that default value only applies at de-serialization time, not serialization time.
To be safe, we need to use SerDe to get a correct resultRecord, which means we need to serialize inputRecord with inputRecord.schema into bytes and deserialize the bytes by the resultRecord.schema.

One more way is to always deserialize the record with the value schema, which is being used to generate the project schema, that means we will need to persist that value schema in view config and use it as the reader schema in SIT, and this will guarantee the simple logic in this function would work all the time, and it might be the most efficient way to handle projection/filtering since schema evolution can remove filtering fields too.

if (oldValue == null) {
return true;
}
if (newValue == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the above comment.

@@ -225,6 +226,7 @@ enum LatchStatus {

// veniceWriterLazyRef could be set and get in different threads, mark it volatile.
private volatile Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef;
private final LinkedList<PubSubMessageProcessedResultWrapper> pendingMessagesToLocalVT;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason that you wanted use LinkedList instead of List itself?

}

public List<PubSubMessageProcessedResultWrapper> getAndClearPendingMessagesToLocalVT() {
LinkedList<PubSubMessageProcessedResultWrapper> pendingMessages = (LinkedList) pendingMessagesToLocalVT.clone();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LinkedList.clone only does shallow copy. Is this what we wanted?

@@ -431,17 +431,15 @@ private AbstractVeniceWriter<byte[], byte[], byte[]> createCompositeVeniceWriter
version.setRmdChunkingEnabled(rmdChunkingEnabled);
// Default deser and decompress function for simple partitioner where value provider is never going to be used.
BiFunction<byte[], Integer, GenericRecord> valueExtractor = (valueBytes, valueSchemaId) -> null;
boolean complexPartitionerConfigured = false;
boolean valueExtractorConfigured = false;
Copy link

@lusong64 lusong64 Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it volatile? My guess is that those booleans will be read by multiple threads otherwise you won't need such a boolean.

if (materializedView.getViewPartitioner()
.getPartitionerType() == VenicePartitioner.VenicePartitionerType.COMPLEX
&& !complexPartitionerConfigured) {
if (materializedView.isValueProviderNeeded() && !valueExtractorConfigured) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that two threads all see the valueExtractorConfigured being false and all go into this protected section? If it is, would the following be better?

if (materializedView.isValueProviderNeeded() && valueExtractorConfigured.compareAndSet(false, true)){
....
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants