Skip to content

Commit 828d214

Browse files
committed
Adds RewriteTimeCastToTimestampNTZ rule
1 parent f60c6cb commit 828d214

File tree

4 files changed

+97
-20
lines changed

4 files changed

+97
-20
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,14 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import java.util
2121
import java.util.Locale
22-
2322
import scala.collection.mutable
2423
import scala.collection.mutable.ArrayBuffer
2524
import scala.jdk.CollectionConverters._
2625
import scala.util.{Failure, Random, Success, Try}
27-
2826
import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
2927
import org.apache.spark.sql.AnalysisException
3028
import org.apache.spark.sql.catalyst._
31-
import org.apache.spark.sql.catalyst.analysis.resolver.{
32-
AnalyzerBridgeState,
33-
HybridAnalyzer,
34-
Resolver => OperatorResolver,
35-
ResolverExtension,
36-
ResolverGuard
37-
}
29+
import org.apache.spark.sql.catalyst.analysis.resolver.{AnalyzerBridgeState, HybridAnalyzer, ResolverExtension, ResolverGuard, Resolver => OperatorResolver}
3830
import org.apache.spark.sql.catalyst.catalog._
3931
import org.apache.spark.sql.catalyst.encoders.OuterScopes
4032
import org.apache.spark.sql.catalyst.expressions._
@@ -49,7 +41,7 @@ import org.apache.spark.sql.catalyst.trees.AlwaysProcess
4941
import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
5042
import org.apache.spark.sql.catalyst.trees.TreePattern._
5143
import org.apache.spark.sql.catalyst.types.DataTypeUtils
52-
import org.apache.spark.sql.catalyst.util.{toPrettySQL, trimTempResolvedColumn, CharVarcharUtils}
44+
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, toPrettySQL, trimTempResolvedColumn}
5345
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
5446
import org.apache.spark.sql.connector.catalog.{View => _, _}
5547
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -483,6 +475,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
483475
ResolveWithCTE,
484476
ExtractDistributedSequenceID) ++
485477
Seq(ResolveUpdateEventTimeWatermarkColumn) ++
478+
Seq(RewriteTimeCastToTimestampNTZ) ++
486479
extendedResolutionRules : _*),
487480
Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn),
488481
Batch("Post-Hoc Resolution", Once,
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.catalyst.analysis
18+
19+
import org.apache.spark.sql.catalyst.expressions.{Cast, CurrentDate, MakeTimestampNTZ}
20+
import org.apache.spark.sql.catalyst.optimizer.ComputeCurrentTime
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
import org.apache.spark.sql.catalyst.rules.Rule
23+
import org.apache.spark.sql.types.{TimestampNTZType, TimeType}
24+
25+
/**
26+
* Rewrites a cast from [[TimeType]] to [[TimestampNTZType]] into a [[MakeTimestampNTZ]]
27+
* expression.
28+
*
29+
* The conversion from TIME to TIMESTAMP_NTZ requires a date component, which TIME itself does not
30+
* provide. This rule injects [[CurrentDate]] as the implicit date part, effectively treating the
31+
* TIME value as a time of day on the current date. This rewrite ensures that all such casts
32+
* within a query use a consistent date, as required by the [[ComputeCurrentTime]] rule which
33+
* replaces [[CurrentDate]] with a fixed value during analysis.
34+
*
35+
* For example, the following SQL:
36+
* {{{
37+
* SELECT CAST(make_time(12, 30, 0) AS TIMESTAMP_NTZ)
38+
* }}}
39+
* will be rewritten to:
40+
* {{{
41+
* SELECT make_timestamp_ntz(current_date, make_time(12, 30, 0))
42+
* }}}
43+
*
44+
* This transformation must happen during resolution, before expression evaluation or
45+
* optimization.
46+
*/
47+
object RewriteTimeCastToTimestampNTZ extends Rule[LogicalPlan] {
48+
override def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
49+
case c @ Cast(child, TimestampNTZType, _, _) if child.dataType.isInstanceOf[TimeType] =>
50+
MakeTimestampNTZ(CurrentDate(), child)
51+
}
52+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -705,16 +705,6 @@ case class Cast(
705705
buildCast[Int](_, d => daysToMicros(d, ZoneOffset.UTC))
706706
case TimestampType =>
707707
buildCast[Long](_, ts => convertTz(ts, ZoneOffset.UTC, zoneId))
708-
case _: TimeType =>
709-
val currentDay = DateTimeUtils.currentDate(zoneId)
710-
buildCast[Long](
711-
_,
712-
nanos => {
713-
val NANOS_PER_DAY = 86_400_000_000_000L // 24 * 60 * 60 * 1_000_000_000
714-
val nanosOfDay = ((nanos % NANOS_PER_DAY) + NANOS_PER_DAY) % NANOS_PER_DAY
715-
DateTimeUtils.makeTimestampNTZ(currentDay, nanosOfDay)
716-
}
717-
)
718708
}
719709

720710
private[this] def decimalToTimestamp(d: Decimal): Long = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, CurrentDate, Literal, MakeTimestampNTZ}
21+
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
22+
import org.apache.spark.sql.types.{TimestampNTZType, TimeType}
23+
24+
class RewriteTimeCastToTimestampNTZSuite extends AnalysisTest {
25+
26+
test("SPARK-52617: RewriteTimeCastToTimestampNTZ rewrites") {
27+
28+
// TIME: 15:30:00 => nanos = 15 * 3600 + 30 * 60 = 55800s => nanos
29+
val nanos = 55800L * 1_000_000_000L
30+
val timeLiteral = Literal(nanos, TimeType(6))
31+
32+
val originalPlan =
33+
Project(Seq(Alias(Cast(timeLiteral, TimestampNTZType), "ts")()), OneRowRelation())
34+
val expectedPlan = Project(
35+
Seq(
36+
Alias(MakeTimestampNTZ(CurrentDate(), Literal.create(timeLiteral, TimeType(6))), "ts")()),
37+
OneRowRelation())
38+
39+
val rewrittenPlan = RewriteTimeCastToTimestampNTZ(originalPlan)
40+
comparePlans(rewrittenPlan, expectedPlan)
41+
}
42+
}

0 commit comments

Comments
 (0)