Skip to content

Conversation

karuppayya
Copy link
Contributor

@karuppayya karuppayya commented Oct 8, 2025

What changes were proposed in this pull request?

Adding ability to use ALPHA family for Theta Sketch

Why are the changes needed?

Theta sketch aggregate currently supports only quick select.
Consumers like Iceberg will benefit from the sketch aggregate if has the ability to use ALPHA family
Iceberg specification to use ALPHA sketches
Custom implementation of theta sketch aggregates in Iceberg that can be replaced with Spark Theta aggregates

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@karuppayya
Copy link
Contributor Author

@cboumalh @mkaravel @dtenedor @cloud-fan Can you help review?
cc: @aokolnychyi @huaxingao

@cboumalh
Copy link
Contributor

Hi @karuppayya. Thanks for the effort here. Is the goal to deprecate the aggregation implementation you linked above and call the one from here directly in the Iceberg code?

@karuppayya
Copy link
Contributor Author

karuppayya commented Oct 10, 2025

Yes, that's right. The goal is to deprecate the custom aggregation implementation(which was done since Spark didnt have the ability then) and have Iceberg call Spark's Theta aggregate/estimate functions directly.
I used Iceberg as a concrete example which could use this functionality when it becomes available, but this change will benefit consumers/users in general.

@cboumalh
Copy link
Contributor

cboumalh commented Oct 10, 2025

I see that makes sense. Just want to point out if the only need for iceberg is approximating count, Spark's HLL can achieve it at similar speeds (though not as fast) with a much smaller memory footprint. How would this work for your case?

@karuppayya
Copy link
Contributor Author

The selection of Alpha family sketch comes from the Iceberg Specification for NDV stats.
Changing this would break the interoperability guarantee that Iceberg provides across engines like Spark, Trino, and Flink.
I took Iceberg as an example, but having the flexibility in choosing the skecth family will benefit users in general.

@cboumalh
Copy link
Contributor

cboumalh commented Oct 10, 2025

Sounds good thank you, and yes for superior update speeds, the Alpha family is beneficial to users. I will review on my end, but don't have any write permissions. Thanks again for the info!

* @since 4.1.0
*/
def theta_sketch_agg(e: Column, family: String): Column =
theta_sketch_agg(e, 12, family)
Copy link
Contributor

@cboumalh cboumalh Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded 12 matches the Catalyst default, so behaviorally it’s fine. Still, duplicating an internal default in the public layer is a bit awkward .We could consider making this explicit (functions above preferred) or defining a local constant for clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I gave some thoughts on this before adding it here. Ideally the resolution to use 12 for logNomEntries should be in ThetaSketchAgg.
ie to say if I just pass the column and family, the logNomEntries gets defaulted in the ThetaSketchAgg.

But,
FunctionRegistry selects the constructor based on the number of expressions. I cannot add a second constructor with same signature as this .
So i decided to default in functions.scala. I also didnt see similar pattern in a different function, so i am not very sure either.
Referencing a local constant in this class also seemed a bit weird, since that would be very specific.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend we remove these two theta_sketch_agg(columnName: String, family: String) and theta_sketch_agg(e: Column, family: String) completely. If we later add another argument that is a of String type, we'll have function overloading ambiguity. Therefore, If a user wants to use a specific family, they must also pass in the lgNomEntries explicitly. It will keep this file cleaner and avoid magic numbers. This will mean we also need to fix both builtin.py files to avoid the same phenomenon. I'm open to seeing what others have to say about this too.

def theta_sketch_agg(
col: "ColumnOrName",
lgNomEntries: Optional[Union[int, Column]] = None,
family: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sake of consistency, we can consider changing this function to look like this:

def theta_sketch_agg(
    col: "ColumnOrName",
    lgNomEntries: Optional[Union[int, Column]] = None,
    family: Optional["ColumnOrName"] = None,
) -> Column:
    fn = "theta_sketch_agg"
    _lgNomEntries = lit(12) if lgNomEntries is None else lit(lgNomEntries)
    _family = lit("QUICKSELECT") if family is None else _to_col(family)

    return _invoke_function_over_columns(fn, col, _lgNomEntries, _family)

Similarly in the other builtin.py

Comment on lines +41 to +46
/*
* QUICKSELECT is optimized for speed and is the default choice for most use cases,
* providing faster updates and queries with slightly higher error rates. ALPHA offers
* better accuracy with slightly higher resource consumption, making it suitable when
* precision is more important than performance. The choice primarily affects the speed
* vs accuracy trade-off.
Copy link
Contributor

@cboumalh cboumalh Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but this is not entirely true. consider this:

/*
   * ALPHA is optimized for speed and offers slightly better initial accuracy
   * (lower error) for simple updates. Its estimation 
   * precision reverts to the standard level if merged with other sketches.
   * QUICKSELECT is the default and more flexible choice, providing the standard 
   * level of accuracy and full support for all set operations (Union, Intersection, etc.).
   */


override protected def withNewChildrenInternal(newFirst: Expression, newSecond: Expression,
newThird: Expression): Expression = copy(newFirst, newSecond,
newThird, mutableAggBufferOffset, inputAggBufferOffset)
Copy link
Contributor

@cboumalh cboumalh Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit, the withNewChildrenInternal override doesn’t need to explicitly pass mutableAggBufferOffset or inputAggBufferOffset. This is redundant. Return type should also be ThetaSketchAgg. Lastly, can consider adding it above on line 146 to group it with the rest

can have something like this:

  override protected def withNewChildrenInternal(
      newFirst: Expression, newSecond: Expression, newThird: Expression): ThetaSketchAgg =
    copy(
      first = newFirst,
      second = newSecond,
      third = newThird)
}

@cboumalh
Copy link
Contributor

Just a few comments left! Thanks @karuppayya!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants