Skip to content

feat: randn expression support #2010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

akupchinskiy
Copy link
Contributor

@akupchinskiy akupchinskiy commented Jul 10, 2025

Which issue does this PR close?

Closes #.

Rationale for this change

Added support for spark randn expression (standard gaussian random variable)

What changes are included in this PR?

Spartk-compatible randn expression implementation and some refactoring of non_deterministic_funcs module to extract reusable common code.

How are these changes tested?

Unit tests in rust and scala code parts

@codecov-commenter
Copy link

codecov-commenter commented Jul 10, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 58.53%. Comparing base (f09f8af) to head (bc4576a).
Report is 327 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2010      +/-   ##
============================================
+ Coverage     56.12%   58.53%   +2.40%     
- Complexity      976     1238     +262     
============================================
  Files           119      134      +15     
  Lines         11743    13049    +1306     
  Branches       2251     2393     +142     
============================================
+ Hits           6591     7638    +1047     
- Misses         4012     4185     +173     
- Partials       1140     1226      +86     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@mbutrovich mbutrovich self-requested a review July 10, 2025 22:02
@mbutrovich
Copy link
Contributor

Looks like we might need to update this to use the new serde map to resolve the merge conflict. I can take a look tomorrow if you don't get a chance, that way we can kick off CI and give it a proper review.

…port

# Conflicts:
#	spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@akupchinskiy
Copy link
Contributor Author

Looks like we might need to update this to use the new serde map to resolve the merge conflict. I can take a look tomorrow if you don't get a chance, that way we can kick off CI and give it a proper review.

Thanks @mbutrovich. I have resolved the conflict

Copy link
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @akupchinskiy! I have a few questions. The big one for me is: does the seed state per partition match Spark's behavior, in particular the life cycle? If the seed gets reset at a different interval, it seems like a complex query would yield different results.

Put differently: rand and randn have fairly simple semantics to implement in native code. What are the implications of these non-deterministic functions returning different samples than Spark?

@@ -2765,6 +2765,26 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("randn expression with random parameters") {
val partitionsNumber = Random.nextInt(10) + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why random values if we only run it once? I see that "rand expression with random parameters" already did this, but it feels like we could get lucky with a simple test case only running it one time. I guess I should double check if we seed this RNG (I know we modified some other tests to use fixed seeds).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why random values if we only run it once? I see that "rand expression with random parameters" already did this, but it feels like we could get lucky with a simple test case only running it one time. I guess I should double check if we seed this RNG (I know we modified some other tests to use fixed seeds).

@mbutrovich Initial idea was testing an invariant that we have the same result as spark for arbitrary parameters. Do you think it would be better having several explicit test cases sharing the same seed but having different batch_size partition_size configuration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there specific configs that seem more important to test than others? I'd want to make sure we hit those. If it's a bit more arbitrary, then I think the current setup is fine.

}
}

impl PhysicalExpr for RandnExpr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use ScalarUDFImpl instead of PhysicalExpr? Not sure if the behavior lines up or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the first glance, it is feasible. Let me try refactoring

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be fantastic, thank you!

Copy link
Contributor Author

@akupchinskiy akupchinskiy Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbutrovich I have encountered a limitation of ScalarUDFImpl for that use case. Since a potential implementation of the scalar function is argument-less, there is no way to extract batch size out of any ScalarFunctionArgs. And, without this information we can not evaluate the expression.

@akupchinskiy
Copy link
Contributor Author

akupchinskiy commented Jul 16, 2025

Thanks @akupchinskiy! I have a few questions. The big one for me is: does the seed state per partition match Spark's behavior, in particular the life cycle? If the seed gets reset at a different interval, it seems like a complex query would yield different results.

I believe the partition-dependent state is a cornerstone for all the nondeterministic functions in spark lifecycle. Basically, a state initialization hook is a part of the common trait of all the nondeterministic functions

Regarding the seed reset, could you elaborate more? A seed is evaluated during planning time and there is no place where it is gonna be reset. When tasks fail, it is gonna be reinitialized again with the same value.

Put differently: rand and randn have fairly simple semantics to implement in native code. What are the implications of these non-deterministic functions returning different samples than Spark?

Returning different samples than Spark seems not to be a big problem. Yet, having reproducible samples can be critical for
ML workloads or idempotent writes relying on the generated value as a part of a key. I don't see a way how we can guarantee reproducibility without inter-batch state management resilient to runtime parameters change (batch size in this very case).

One thing to add, there are indeed complex queries where the output is not guaranteed to be the same even for the same seed. Under the hood, rand is just a plain function of two arguments - seed (different per each partition) and a row number in the batch. And the second argument is the one making the whole thing truly undertermenistic since there is no guarantee on the order of incoming rows fetched from remote workers . Yet, there are cases where row number is stable across different executions:

  1. Evaluation happens before any shuffle: spark.read.parquet(...).select(rand(42))
  2. An evaluation order is enforced by explicit local sort: spark.read.parquet(...).repartition(42).sortWithinPartitions("some_unique_key").select(rand(42))

In those scenarios we do have reproducibility and I believe a native implementation should also have this property.

@mbutrovich
Copy link
Contributor

In those scenarios we do have reproducibility and I believe a native implementation should also have this property.

Thank you for the great explanation! This makes sense to me. The reset I was concerned about was if the state was held in a variable with a different life cycle, but if it just gets reinitialized from the plan then I think we're in the right place.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants