-
Notifications
You must be signed in to change notification settings - Fork 225
feat: randn expression support #2010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2010 +/- ##
============================================
+ Coverage 56.12% 58.53% +2.40%
- Complexity 976 1238 +262
============================================
Files 119 134 +15
Lines 11743 13049 +1306
Branches 2251 2393 +142
============================================
+ Hits 6591 7638 +1047
- Misses 4012 4185 +173
- Partials 1140 1226 +86 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Looks like we might need to update this to use the new serde map to resolve the merge conflict. I can take a look tomorrow if you don't get a chance, that way we can kick off CI and give it a proper review. |
…port # Conflicts: # spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Thanks @mbutrovich. I have resolved the conflict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @akupchinskiy! I have a few questions. The big one for me is: does the seed state per partition match Spark's behavior, in particular the life cycle? If the seed gets reset at a different interval, it seems like a complex query would yield different results.
Put differently: rand
and randn
have fairly simple semantics to implement in native code. What are the implications of these non-deterministic functions returning different samples than Spark?
@@ -2765,6 +2765,26 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { | |||
} | |||
} | |||
|
|||
test("randn expression with random parameters") { | |||
val partitionsNumber = Random.nextInt(10) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why random values if we only run it once? I see that "rand expression with random parameters"
already did this, but it feels like we could get lucky with a simple test case only running it one time. I guess I should double check if we seed this RNG (I know we modified some other tests to use fixed seeds).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why random values if we only run it once? I see that
"rand expression with random parameters"
already did this, but it feels like we could get lucky with a simple test case only running it one time. I guess I should double check if we seed this RNG (I know we modified some other tests to use fixed seeds).
@mbutrovich Initial idea was testing an invariant that we have the same result as spark for arbitrary parameters. Do you think it would be better having several explicit test cases sharing the same seed but having different batch_size partition_size configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there specific configs that seem more important to test than others? I'd want to make sure we hit those. If it's a bit more arbitrary, then I think the current setup is fine.
} | ||
} | ||
|
||
impl PhysicalExpr for RandnExpr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to use ScalarUDFImpl
instead of PhysicalExpr
? Not sure if the behavior lines up or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the first glance, it is feasible. Let me try refactoring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be fantastic, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbutrovich I have encountered a limitation of ScalarUDFImpl for that use case. Since a potential implementation of the scalar function is argument-less, there is no way to extract batch size out of any ScalarFunctionArgs. And, without this information we can not evaluate the expression.
I believe the partition-dependent state is a cornerstone for all the nondeterministic functions in spark lifecycle. Basically, a state initialization hook is a part of the common trait of all the nondeterministic functions Regarding the seed reset, could you elaborate more? A seed is evaluated during planning time and there is no place where it is gonna be reset. When tasks fail, it is gonna be reinitialized again with the same value.
Returning different samples than Spark seems not to be a big problem. Yet, having reproducible samples can be critical for One thing to add, there are indeed complex queries where the output is not guaranteed to be the same even for the same seed. Under the hood, rand is just a plain function of two arguments - seed (different per each partition) and a row number in the batch. And the second argument is the one making the whole thing truly undertermenistic since there is no guarantee on the order of incoming rows fetched from remote workers . Yet, there are cases where row number is stable across different executions:
In those scenarios we do have reproducibility and I believe a native implementation should also have this property. |
Thank you for the great explanation! This makes sense to me. The reset I was concerned about was if the state was held in a variable with a different life cycle, but if it just gets reinitialized from the plan then I think we're in the right place. |
Which issue does this PR close?
Closes #.
Rationale for this change
Added support for spark randn expression (standard gaussian random variable)
What changes are included in this PR?
Spartk-compatible randn expression implementation and some refactoring of non_deterministic_funcs module to extract reusable common code.
How are these changes tested?
Unit tests in rust and scala code parts