-
Notifications
You must be signed in to change notification settings - Fork 97
[server][common][controller][vpj] Materialized view projection and filter support #1647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…lter support Add projection and filtering support for materialized view (MV) to be more efficient about unwanted data for view consumers. Projection can be enabled by setting projection fields in the materialized view parameters. Similarly filtering can be enabled by setting filter by fields. These two features can be enabled separately or together. If enabled together the filter by fields will be included in the projecting fields automatically. Here is an example MV configuration to illustrate the ideas: Record containing fields: {a, b, c, d, e} Projecting fields: {b, c} Filtering fields: {a} The only filtering option for now is to skip if none of the filter by fields changed. The filtering is also only applied during hybrid ingestion since it doesn't make sense to have a change filter on batch push. With the above setup we will project and write all batch data to the MV ({a, b, c}). RT updates (full PUT or UPDATE) will project and write the resulting record to the MV ({a, b, c}) only if the value of field (a) is different from the old value. All DELETE events will be written to the MV (no filtering). In order to achieve the above behavior there are several changes: 1. Previously we've used pub sub message headers to perform forwarding to handle chunks during NR pass-through in remote regions. This strategy will not work with projection because in order for us to perform projection on batch data in remote regions, we will need the remote partition leaders to assemble the chunks during NR pass-through. We are replacing the forwarding strategy with InMemoryChunkAssembler. To ensure leaders don't resume in-between chunks we will also buffer and delay writing the chunks to drainer until we have a fully assembled record and produced it to view topic(s). 2. Added enforcement in controller to ensure view configs are immutable. Projection schema is generated when adding a new materialized view and stored with the view config. Since there can only be one schema version per view, the znode size should be manageable with compression. If this becomes a concern we can also store it separately or generate it on the fly. We also verify the filtering by fields and projection fields to ensure they exist in latest superset or value schema and have default values. 3. Projection is performed in ComplexVeniceWriter as part of complexPut so both VPJ and leaders can use the same code for projection. Filtering is performed in MaterializedViewWriter since current offering of change filter is applicable only to hyrbid writes.
7098b0f
to
4eb2f54
Compare
for (Map.Entry<String, VeniceViewWriter> viewWriter: viewWriters.entrySet()) { | ||
if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) { | ||
if (viewWriter.getValue().getViewWriterType() == VeniceViewWriter.ViewWriterType.CHANGE_CAPTURE_VIEW) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit,
We can remove one call of viewWriter.getValue().getViewWriterType()
if we do it before the if/else
.
@@ -3179,9 +3194,19 @@ private PubSubMessageProcessedResult processMessage( | |||
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue(); | |||
byte[] keyBytes = kafkaKey.getKey(); | |||
MessageType msgType = MessageType.valueOf(kafkaValue.messageType); | |||
Lazy<GenericRecord> valueProvider; | |||
Lazy<GenericRecord> valueProvider = Lazy.of(() -> null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make a static final
variable in Lazy
class for null Lazy
: Lazy.of(() -> null)
?
|
||
if (hasFilterByFieldsMaterializedView) { | ||
// Copy the currValue since it will be used for in-place update(s). | ||
GenericRecord oldValue = new GenericData.Record((GenericData.Record) currValue, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You meant the filtering logic can modify the current value, right?
Is it guaranteed that it will always modify the current value, if not, I think we can do a lazy deepcopy.
if (hasViewWriters()) { | ||
Put newPut = writeComputeResultWrapper.getNewPut(); | ||
Map<String, Set<Integer>> viewPartitionMap = null; | ||
consumerRecordWrapper.setProcessedResult(new PubSubMessageProcessedResult(writeComputeResultWrapper)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to setup ProcessedResult
here?
// NR pass-through records are expected to carry view partition map in the message header | ||
viewPartitionMap = ViewUtils.extractViewPartitionMap(consumerRecord.getPubSubMessageHeaders()); | ||
// Native replication (NR) pass-through mode | ||
partitionConsumptionState.addMessageToPendingMessages(consumerRecordWrapper); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to buffer it for non-projection/filtering views, right?
@@ -46,11 +47,24 @@ public MaterializedViewWriter( | |||
new MaterializedView(props.getCombinedProperties().toProperties(), version.getStoreName(), extraViewParameters); | |||
materializedViewTopicName = | |||
internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get(); | |||
this.veniceWriter = Lazy.of( | |||
veniceWriter = Lazy.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us try to make veniceWriter
final and for testing purpose, we can offer another constructor to let the user of class to pass a VeniceWriter
instead of exposing a setter.
writerProps.put(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS)); | ||
writerProps.put(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS)); | ||
// If materialized views are present we will disable speculative execution and use default GUID generator | ||
if (this.props.getString(PUSH_JOB_VIEW_CONFIGS, "").isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it cause any issues if we always use deterministic GUID generator?
return viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_SCHEMA.name()); | ||
} | ||
|
||
public boolean isValueProviderNeeded() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For view with only filtering rules, we also need the ValueProvider
, right?
for (Schema.Field field: resultRecord.getSchema().getFields()) { | ||
inputRecordField = inputRecord.getSchema().getField(field.name()); | ||
if (inputRecordField != null) { | ||
resultRecord.put(field.pos(), inputRecord.get(inputRecordField.pos())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might not work.
Assume:
1. Schema1 has two fields: f1, f2 and f1 has two sub fields: f11, f12.
2. Schema2 has two fields: f1, and f1 has three sub fields: f11, f12, f13 and all of them have a default value.
3. Schema1 the schema being used by `inputRecord`.
4. Schema2 is being used by `resultRecord`.
When the logic here, the resultRecord
will carry f1 with f11, f12, but schema of f1 contains f11, f12 and f13, so the serialization of resultRecord
will fail.
We should always keep in mind that default value
only applies at de-serialization time, not serialization time.
To be safe, we need to use SerDe to get a correct resultRecord
, which means we need to serialize inputRecord
with inputRecord.schema
into bytes and deserialize the bytes by the resultRecord.schema
.
One more way is to always deserialize the record with the value schema, which is being used to generate the project schema, that means we will need to persist that value schema in view config and use it as the reader schema in SIT, and this will guarantee the simple logic in this function would work all the time, and it might be the most efficient way to handle projection/filtering since schema evolution can remove filtering fields too.
if (oldValue == null) { | ||
return true; | ||
} | ||
if (newValue == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the above comment.
@@ -225,6 +226,7 @@ enum LatchStatus { | |||
|
|||
// veniceWriterLazyRef could be set and get in different threads, mark it volatile. | |||
private volatile Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef; | |||
private final LinkedList<PubSubMessageProcessedResultWrapper> pendingMessagesToLocalVT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason that you wanted use LinkedList instead of List itself?
} | ||
|
||
public List<PubSubMessageProcessedResultWrapper> getAndClearPendingMessagesToLocalVT() { | ||
LinkedList<PubSubMessageProcessedResultWrapper> pendingMessages = (LinkedList) pendingMessagesToLocalVT.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LinkedList.clone only does shallow copy. Is this what we wanted?
...inci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java
Show resolved
Hide resolved
@@ -431,17 +431,15 @@ private AbstractVeniceWriter<byte[], byte[], byte[]> createCompositeVeniceWriter | |||
version.setRmdChunkingEnabled(rmdChunkingEnabled); | |||
// Default deser and decompress function for simple partitioner where value provider is never going to be used. | |||
BiFunction<byte[], Integer, GenericRecord> valueExtractor = (valueBytes, valueSchemaId) -> null; | |||
boolean complexPartitionerConfigured = false; | |||
boolean valueExtractorConfigured = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it volatile? My guess is that those booleans will be read by multiple threads otherwise you won't need such a boolean.
if (materializedView.getViewPartitioner() | ||
.getPartitionerType() == VenicePartitioner.VenicePartitionerType.COMPLEX | ||
&& !complexPartitionerConfigured) { | ||
if (materializedView.isValueProviderNeeded() && !valueExtractorConfigured) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that two threads all see the valueExtractorConfigured being false and all go into this protected section? If it is, would the following be better?
if (materializedView.isValueProviderNeeded() && valueExtractorConfigured.compareAndSet(false, true)){
....
}
Problem Statement
Consumers of a materialized view might not be interested in every data field and update event. Having the ability to perform projection and filter could reduce footprint and improve ingestion performance for MV consumers.
Solution
Add projection and filtering support for materialized view (MV) to be more efficient about unwanted data for view consumers. Projection can be enabled by setting projection fields in the materialized view parameters. Similarly filtering can be enabled by setting filter by fields. These two features can be enabled separately or together. If enabled together the filter by fields will be included in the projecting fields automatically. Here is an example MV configuration to illustrate the ideas:
Record containing fields: {a, b, c, d, e}
Projecting fields: {b, c}
Filtering fields: {a}
The only filtering option for now is to skip if none of the filter by fields changed. The filtering is also only applied during hybrid ingestion since it doesn't make sense to have a change filter on batch push. With the above setup we will project and write all batch data to the MV ({a, b, c}). RT updates (full PUT or UPDATE) will project and write the resulting record to the MV ({a, b, c}) only if the value of field (a) is different from the old value. All DELETE events will be written to the MV (no filtering).
Code changes
In order to achieve the above behavior there are several changes:
Previously we've used pub sub message headers to perform forwarding to handle chunks during NR pass-through in remote regions. This strategy will not work with projection because in order for us to perform projection on batch data in remote regions, we will need the remote partition leaders to assemble the chunks during NR pass-through. We are replacing the forwarding strategy with InMemoryChunkAssembler. To ensure leaders don't resume in-between chunks we will also buffer and delay writing the chunks to drainer until we have a fully assembled record and produced it to view topic(s). The view partition header code is left untouched in VPJ to remove deployment or rollback order requirements. i.e. VPJ can get ahead of the server. If server gets ahead of VPJ that's fine too because server's new chunking support can function on its own. We can clean up everything once everything is deployed and stable (no more rollbacks).
Added enforcement in controller to ensure view configs are immutable. Projection schema is generated when adding a new materialized view and stored with the view config. Since there can only be one schema version per view, the znode size should be manageable with compression. If this becomes a concern we can also store it separately or generate it on the fly. We also verify the filtering by fields and projection fields to ensure they exist in latest superset or value schema and have default values.
Projection is performed in ComplexVeniceWriter as part of complexPut so both VPJ and leaders can use the same code for projection. Filtering is performed in MaterializedViewWriter since current offering of change filter is applicable only to hyrbid writes.
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized
,RWLock
) are used where needed.ConcurrentHashMap
,CopyOnWriteArrayList
).How was this PR tested?
Integration tests and will add unit tests once have some consensus on the changes.
Does this PR introduce any user-facing or breaking changes?