-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53848] Add ability to support Alpha family in Theta Aggregates #52551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
d59f6df
to
c1d4fe8
Compare
c1d4fe8
to
13aa686
Compare
f2f3cb6
to
47ef17e
Compare
@cboumalh @mkaravel @dtenedor @cloud-fan Can you help review? |
Hi @karuppayya. Thanks for the effort here. Is the goal to deprecate the aggregation implementation you linked above and call the one from here directly in the Iceberg code? |
Yes, that's right. The goal is to deprecate the custom aggregation implementation(which was done since Spark didnt have the ability then) and have Iceberg call Spark's Theta aggregate/estimate functions directly. |
I see that makes sense. Just want to point out if the only need for iceberg is approximating count, Spark's HLL can achieve it at similar speeds (though not as fast) with a much smaller memory footprint. How would this work for your case? |
The selection of Alpha family sketch comes from the Iceberg Specification for NDV stats. |
Sounds good thank you, and yes for superior update speeds, the Alpha family is beneficial to users. I will review on my end, but don't have any write permissions. Thanks again for the info! |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ThetaSketchUtils.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ThetaSketchUtils.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
Outdated
Show resolved
Hide resolved
* @since 4.1.0 | ||
*/ | ||
def theta_sketch_agg(e: Column, family: String): Column = | ||
theta_sketch_agg(e, 12, family) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded 12 matches the Catalyst default, so behaviorally it’s fine. Still, duplicating an internal default in the public layer is a bit awkward .We could consider making this explicit (functions above preferred) or defining a local constant for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I gave some thoughts on this before adding it here. Ideally the resolution to use 12 for logNomEntries should be in ThetaSketchAgg
.
ie to say if I just pass the column and family, the logNomEntries gets defaulted in the ThetaSketchAgg
.
But,
FunctionRegistry selects the constructor based on the number of expressions. I cannot add a second constructor with same signature as this .
So i decided to default in functions.scala. I also didnt see similar pattern in a different function, so i am not very sure either.
Referencing a local constant in this class also seemed a bit weird, since that would be very specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend we remove these two theta_sketch_agg(columnName: String, family: String)
and theta_sketch_agg(e: Column, family: String)
completely. If we later add another argument that is a of String
type, we'll have function overloading ambiguity. Therefore, If a user wants to use a specific family, they must also pass in the lgNomEntries
explicitly. It will keep this file cleaner and avoid magic numbers. This will mean we also need to fix both builtin.py files to avoid the same phenomenon. I'm open to seeing what others have to say about this too.
6894ab8
to
9b7654a
Compare
37029cf
to
68feaf4
Compare
def theta_sketch_agg( | ||
col: "ColumnOrName", | ||
lgNomEntries: Optional[Union[int, Column]] = None, | ||
family: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the sake of consistency, we can consider changing this function to look like this:
def theta_sketch_agg(
col: "ColumnOrName",
lgNomEntries: Optional[Union[int, Column]] = None,
family: Optional["ColumnOrName"] = None,
) -> Column:
fn = "theta_sketch_agg"
_lgNomEntries = lit(12) if lgNomEntries is None else lit(lgNomEntries)
_family = lit("QUICKSELECT") if family is None else _to_col(family)
return _invoke_function_over_columns(fn, col, _lgNomEntries, _family)
Similarly in the other builtin.py
/* | ||
* QUICKSELECT is optimized for speed and is the default choice for most use cases, | ||
* providing faster updates and queries with slightly higher error rates. ALPHA offers | ||
* better accuracy with slightly higher resource consumption, making it suitable when | ||
* precision is more important than performance. The choice primarily affects the speed | ||
* vs accuracy trade-off. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, but this is not entirely true. consider this:
/*
* ALPHA is optimized for speed and offers slightly better initial accuracy
* (lower error) for simple updates. Its estimation
* precision reverts to the standard level if merged with other sketches.
* QUICKSELECT is the default and more flexible choice, providing the standard
* level of accuracy and full support for all set operations (Union, Intersection, etc.).
*/
|
||
override protected def withNewChildrenInternal(newFirst: Expression, newSecond: Expression, | ||
newThird: Expression): Expression = copy(newFirst, newSecond, | ||
newThird, mutableAggBufferOffset, inputAggBufferOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit, the withNewChildrenInternal override doesn’t need to explicitly pass mutableAggBufferOffset or inputAggBufferOffset. This is redundant. Return type should also be ThetaSketchAgg
. Lastly, can consider adding it above on line 146 to group it with the rest
can have something like this:
override protected def withNewChildrenInternal(
newFirst: Expression, newSecond: Expression, newThird: Expression): ThetaSketchAgg =
copy(
first = newFirst,
second = newSecond,
third = newThird)
}
Just a few comments left! Thanks @karuppayya! |
What changes were proposed in this pull request?
Adding ability to use ALPHA family for Theta Sketch
Why are the changes needed?
Theta sketch aggregate currently supports only quick select.
Consumers like Iceberg will benefit from the sketch aggregate if has the ability to use
ALPHA family
Iceberg specification to use ALPHA sketches
Custom implementation of theta sketch aggregates in Iceberg that can be replaced with Spark Theta aggregates
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
No