Skip to content

[SPARK-52693][SQL] Support +/- ANSI day-time intervals to/from TIME #51383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ public class DateTimeConstants {
public static final long NANOS_PER_MICROS = 1000L;
public static final long NANOS_PER_MILLIS = MICROS_PER_MILLIS * NANOS_PER_MICROS;
public static final long NANOS_PER_SECOND = MILLIS_PER_SECOND * NANOS_PER_MILLIS;
public static final long NANOS_PER_DAY = MICROS_PER_DAY * NANOS_PER_MICROS;
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private[spark] object AnsiIntervalType extends AbstractDataType {
*/
private[sql] abstract class AnyTimeType extends DatetimeType

private[spark] object AnyTimeType extends AbstractDataType {
private[spark] object AnyTimeType extends AbstractDataType with Serializable {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Is this Serializable required for this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, otherwise StaticInvoke fails with:

java.io.NotSerializableException: org.apache.spark.sql.types.AnyTimeType$
Serialization stack:
	- object not serializable (class: org.apache.spark.sql.types.AnyTimeType$, value: org.apache.spark.sql.types.AnyTimeType$@1929425f)
	- writeObject data (class: scala.collection.generic.DefaultSerializationProxy)
	- object (class scala.collection.generic.DefaultSerializationProxy, scala.collection.generic.DefaultSerializationProxy@29d37757)
	- writeReplace data (class: scala.collection.generic.DefaultSerializationProxy)
	- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.sql.types.AnyTimeType$@1929425f, IntegerType, DayTimeIntervalType, ByteType, IntegerType))
	- field (class: org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke, name: inputTypes, type: interface scala.collection.immutable.Seq)
	- object (class org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke, static_invoke(DateTimeUtils.timeAddInterval(null, 6, INTERVAL '0 01:00:00' DAY TO SECOND, 3, 6)))
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:43)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:50)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:122)

override private[sql] def simpleString: String = "time"

override private[sql] def acceptsType(other: DataType): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.{
Subtract,
SubtractDates,
SubtractTimestamps,
TimeAddInterval,
TimestampAddInterval,
TimestampAddYMInterval,
UnaryMinus
Expand All @@ -53,6 +54,7 @@ import org.apache.spark.sql.types.{
StringType,
TimestampNTZType,
TimestampType,
TimeType,
YearMonthIntervalType
}
import org.apache.spark.sql.types.DayTimeIntervalType.DAY
Expand Down Expand Up @@ -80,6 +82,8 @@ object BinaryArithmeticWithDatetimeResolver {
a.copy(right = Cast(a.right, a.left.dataType))
case (DateType, CalendarIntervalType) =>
DateAddInterval(l, r, ansiEnabled = mode == EvalMode.ANSI)
case (_: TimeType, _: DayTimeIntervalType) => TimeAddInterval(l, r)
case (_: DayTimeIntervalType, _: TimeType) => TimeAddInterval(r, l)
case (_, CalendarIntervalType | _: DayTimeIntervalType) =>
Cast(TimestampAddInterval(l, r), l.dataType)
case (CalendarIntervalType, DateType) =>
Expand Down Expand Up @@ -118,6 +122,8 @@ object BinaryArithmeticWithDatetimeResolver {
ansiEnabled = mode == EvalMode.ANSI
)
)
case (_: TimeType, _: DayTimeIntervalType) =>
TimeAddInterval(l, UnaryMinus(r, mode == EvalMode.ANSI))
case (_, CalendarIntervalType | _: DayTimeIntervalType) =>
Cast(DatetimeSub(l, r,
TimestampAddInterval(l, UnaryMinus(r, mode == EvalMode.ANSI))), l.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.time.DateTimeException
import java.util.Locale

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, TypeCheckResult}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
Expand All @@ -32,7 +33,8 @@ import org.apache.spark.sql.catalyst.util.TimeFormatter
import org.apache.spark.sql.catalyst.util.TypeUtils.ordinalNumber
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.types.StringTypeWithCollation
import org.apache.spark.sql.types.{AbstractDataType, AnyTimeType, DataType, DecimalType, IntegerType, ObjectType, TimeType}
import org.apache.spark.sql.types.{AbstractDataType, AnyTimeType, ByteType, DataType, DayTimeIntervalType, DecimalType, IntegerType, ObjectType, TimeType}
import org.apache.spark.sql.types.DayTimeIntervalType.SECOND
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -563,3 +565,44 @@ case class MakeTime(
override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): MakeTime =
copy(hours = newChildren(0), minutes = newChildren(1), secsAndMicros = newChildren(2))
}

/**
* Adds day-time interval to time.
*/
case class TimeAddInterval(time: Expression, interval: Expression)
extends BinaryExpression with RuntimeReplaceable with ExpectsInputTypes {
override def nullIntolerant: Boolean = true

override def left: Expression = time
override def right: Expression = interval

override def toString: String = s"$left + $right"
override def sql: String = s"${left.sql} + ${right.sql}"
override def inputTypes: Seq[AbstractDataType] = Seq(AnyTimeType, DayTimeIntervalType)

override def replacement: Expression = {
val (timePrecision, intervalEndField) = (time.dataType, interval.dataType) match {
case (TimeType(p), DayTimeIntervalType(_, endField)) => (p, endField)
case _ => throw SparkException.internalError("Unexpected input types: " +
s"time type ${time.dataType.sql}, interval type ${interval.dataType.sql}.")
}
val intervalPrecision = if (intervalEndField < SECOND) {
TimeType.MIN_PRECISION
} else {
TimeType.MICROS_PRECISION
}
val targetPrecision = Math.max(timePrecision, intervalPrecision)
StaticInvoke(
classOf[DateTimeUtils.type],
TimeType(targetPrecision),
"timeAddInterval",
Seq(time, Literal(timePrecision), interval, Literal(intervalEndField),
Literal(targetPrecision)),
Seq(AnyTimeType, IntegerType, DayTimeIntervalType, ByteType, IntegerType),
propagateNull = nullIntolerant)
}

override protected def withNewChildrenInternal(
newTime: Expression, newInterval: Expression): TimeAddInterval =
copy(time = newTime, interval = newInterval)
}
Original file line number Diff line number Diff line change
Expand Up @@ -834,4 +834,31 @@ object DateTimeUtils extends SparkDateTimeUtils {
def makeTimestampNTZ(days: Int, nanos: Long): Long = {
localDateTimeToMicros(LocalDateTime.of(daysToLocalDate(days), nanosToLocalTime(nanos)))
}

/**
* Adds a day-time interval to a time.
*
* @param time A time in nanoseconds.
* @param timePrecision The number of digits of the fraction part of time.
* @param interval A day-time interval in microseconds.
* @param intervalEndField The rightmost field which the interval comprises of.
* Valid values: 0 (DAY), 1 (HOUR), 2 (MINUTE), 3 (SECOND).
* @param targetPrecision The number of digits of the fraction part of the resulting time.
* @return A time value in nanoseconds or throw an arithmetic overflow
* if the result out of valid time range [00:00, 24:00).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if the day-time interval has the day field, it will always overflow? Shall we check the start field of day-time internal at the analysis time to make sure it's not DAY?

Copy link
Member Author

@MaxGekk MaxGekk Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if the day-time interval has the day field, it will always overflow?

Not always. Values in the day field can be 0. In that case, it couldn't overflow.

Shall we check the start field of day-time internal at the analysis time to make sure it's not DAY?

Maybe, do you mean the end field? And prohibit DayTimeIntervalType(DAY, DAY)? Even with that type, users might construct valid expressions like TIME'12:30' + INTERVAL '0' DAY. The SQL standard says nothing about that case (@srielau Am I right?). @dongjoon-hyun @yaooqinn @cloud-fan WDYT, should we disallow such day intervals?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see it, DATETIME_OVERFLOW is simple and sufficient

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's keep it runtime error then

*/
def timeAddInterval(
time: Long,
timePrecision: Int,
interval: Long,
intervalEndField: Byte,
targetPrecision: Int): Long = {
val result = MathUtils.addExact(time, MathUtils.multiplyExact(interval, NANOS_PER_MICROS))
if (0 <= result && result < NANOS_PER_DAY) {
truncateTimeToPrecision(result, targetPrecision)
} else {
throw QueryExecutionErrors.timeAddIntervalOverflowError(
time, timePrecision, interval, intervalEndField)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ValueInterval
import org.apache.spark.sql.catalyst.trees.{Origin, TreeNode}
import org.apache.spark.sql.catalyst.util.{sideBySide, CharsetProvider, DateTimeUtils, FailFastMode, MapData}
import org.apache.spark.sql.catalyst.util.{sideBySide, CharsetProvider, DateTimeUtils, FailFastMode, IntervalUtils, MapData}
import org.apache.spark.sql.connector.catalog.{CatalogNotFoundException, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.expressions.Transform
Expand Down Expand Up @@ -2534,6 +2534,21 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
summary = "")
}

def timeAddIntervalOverflowError(
time: Long,
timePrecision: Int,
interval: Long,
intervalEndField: Byte): ArithmeticException = {
val i = toSQLValue(IntervalUtils.microsToDuration(interval),
DayTimeIntervalType(intervalEndField))
val t = toSQLValue(DateTimeUtils.nanosToLocalTime(time), TimeType(timePrecision))
new SparkArithmeticException(
errorClass = "DATETIME_OVERFLOW",
messageParameters = Map("operation" -> s"add $i to the time value $t"),
context = Array.empty,
summary = "")
}

def invalidBucketFile(path: String): Throwable = {
new SparkException(
errorClass = "INVALID_BUCKET_FILE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.catalyst.expressions

import java.time.LocalTime
import java.time.{Duration, LocalTime}

import org.apache.spark.{SPARK_DOC_ROOT, SparkDateTimeException, SparkFunSuite}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLId, toSQLValue}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, StringType, TimeType}
import org.apache.spark.sql.types.{DayTimeIntervalType, Decimal, DecimalType, IntegerType, StringType, TimeType}
import org.apache.spark.sql.types.DayTimeIntervalType.{DAY, HOUR, SECOND}

class TimeExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("ParseToTime") {
Expand Down Expand Up @@ -364,4 +365,32 @@ class TimeExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
(child: Expression) => SecondsOfTimeWithFraction(child).replacement,
TimeType())
}

test("Add ANSI day-time intervals to TIME") {
checkEvaluation(
TimeAddInterval(Literal.create(null, TimeType()), Literal(Duration.ofHours(1))),
null)
checkEvaluation(
TimeAddInterval(Literal(LocalTime.of(12, 30)), Literal(null, DayTimeIntervalType(SECOND))),
null)
checkEvaluation(
TimeAddInterval(Literal(LocalTime.of(8, 31)), Literal(Duration.ofMinutes(30))),
LocalTime.of(8, 31).plusMinutes(30))
// Maximum precision of TIME and DAY-TIME INTERVAL
assert(TimeAddInterval(
Literal(0L, TimeType(0)),
Literal(0L, DayTimeIntervalType(DAY))).dataType == TimeType(0))
assert(TimeAddInterval(
Literal(1L, TimeType(TimeType.MAX_PRECISION)),
Literal(1L, DayTimeIntervalType(HOUR))).dataType == TimeType(TimeType.MAX_PRECISION))
assert(TimeAddInterval(
Literal(2L, TimeType(TimeType.MIN_PRECISION)),
Literal(2L, DayTimeIntervalType(SECOND))).dataType == TimeType(TimeType.MICROS_PRECISION))
assert(TimeAddInterval(
Literal(3L, TimeType(TimeType.MAX_PRECISION)),
Literal(3L, DayTimeIntervalType(SECOND))).dataType == TimeType(TimeType.MAX_PRECISION))
checkConsistencyBetweenInterpretedAndCodegenAllowingException(
(time: Expression, interval: Expression) => TimeAddInterval(time, interval).replacement,
TimeType(), DayTimeIntervalType())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import java.util.concurrent.TimeUnit
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._

import org.apache.spark.{SparkDateTimeException, SparkFunSuite, SparkIllegalArgumentException}
import org.apache.spark.{SparkArithmeticException, SparkDateTimeException, SparkFunSuite, SparkIllegalArgumentException}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.RebaseDateTime.rebaseJulianToGregorianMicros
import org.apache.spark.sql.errors.DataTypeErrors.toSQLConf
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types.DayTimeIntervalType.{HOUR, SECOND}
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand Down Expand Up @@ -1250,4 +1253,45 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
assert(truncateTimeToPrecision(localTime(23, 59, 59, 987654), 1) ==
localTime(23, 59, 59, 900000))
}

test("add day-time interval to time") {
assert(timeAddInterval(0, 0, 0, SECOND, 6) == localTime())
assert(timeAddInterval(0, 6, MICROS_PER_DAY - 1, SECOND, 6) ==
localTime(23, 59, 59, 999999))
assert(timeAddInterval(localTime(23, 59, 59, 999999), 0, -MICROS_PER_DAY + 1, SECOND, 6) ==
localTime(0, 0))
assert(timeAddInterval(localTime(12, 30, 43, 123400), 4, 10 * MICROS_PER_MINUTE, SECOND, 6) ==
localTime(12, 40, 43, 123400))
assert(timeAddInterval(localTime(19, 31, 45, 123450), 5, 6, SECOND, 6) ==
localTime(19, 31, 45, 123456))
assert(timeAddInterval(localTime(1, 2, 3, 1), 6, MICROS_PER_HOUR, HOUR, 6) ==
localTime(2, 2, 3, 1))

checkError(
exception = intercept[SparkArithmeticException] {
timeAddInterval(1, 6, MICROS_PER_DAY, SECOND, 6)
},
condition = "DATETIME_OVERFLOW",
parameters = Map("operation" ->
"add INTERVAL '86400' SECOND to the time value TIME '00:00:00.000000001'")
)
checkError(
exception = intercept[SparkArithmeticException] {
timeAddInterval(0, 0, -1, SECOND, 6)
},
condition = "DATETIME_OVERFLOW",
parameters = Map("operation" ->
"add INTERVAL '-00.000001' SECOND to the time value TIME '00:00:00'")
)
checkError(
exception = intercept[SparkArithmeticException] {
timeAddInterval(0, 0, Long.MaxValue, SECOND, 6)
},
condition = "ARITHMETIC_OVERFLOW",
parameters = Map(
"message" -> "long overflow",
"alternative" -> "",
"config" -> toSQLConf(SqlApiConf.ANSI_ENABLED_KEY))
)
}
}
101 changes: 101 additions & 0 deletions sql/core/src/test/resources/sql-tests/analyzer-results/time.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,104 @@ SELECT cast(time'11:59:59.999999' as time without time zone)
-- !query analysis
Project [cast(11:59:59.999999 as time(6)) AS CAST(TIME '11:59:59.999999' AS TIME(6))#x]
+- OneRowRelation


-- !query
SELECT '12:43:33.1234' :: TIME(4) + INTERVAL '01:04:05.56' HOUR TO SECOND
-- !query analysis
Project [cast(12:43:33.1234 as time(4)) + INTERVAL '01:04:05.56' HOUR TO SECOND AS CAST(12:43:33.1234 AS TIME(4)) + INTERVAL '01:04:05.56' HOUR TO SECOND#x]
+- OneRowRelation


-- !query
SELECT TIME'08:30' + NULL
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES",
"sqlState" : "42K09",
"messageParameters" : {
"left" : "\"TIME(6)\"",
"right" : "\"VOID\"",
"sqlExpr" : "\"(TIME '08:30:00' + NULL)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 25,
"fragment" : "TIME'08:30' + NULL"
} ]
}


-- !query
SELECT TIME'00:00:00.0101' + 1
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES",
"sqlState" : "42K09",
"messageParameters" : {
"left" : "\"TIME(6)\"",
"right" : "\"INT\"",
"sqlExpr" : "\"(TIME '00:00:00.0101' + 1)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 30,
"fragment" : "TIME'00:00:00.0101' + 1"
} ]
}


-- !query
SELECT TIME'12:30' - INTERVAL '12:29:59.000001' HOUR TO SECOND
-- !query analysis
Project [12:30:00 + -INTERVAL '12:29:59.000001' HOUR TO SECOND AS TIME '12:30:00' + (- INTERVAL '12:29:59.000001' HOUR TO SECOND)#x]
+- OneRowRelation


-- !query
SELECT '23:59:59.999999' :: TIME - INTERVAL '23:59:59.999999' HOUR TO SECOND
-- !query analysis
Project [cast(23:59:59.999999 as time(6)) + -INTERVAL '23:59:59.999999' HOUR TO SECOND AS CAST(23:59:59.999999 AS TIME(6)) + (- INTERVAL '23:59:59.999999' HOUR TO SECOND)#x]
+- OneRowRelation


-- !query
SELECT '00:00:00.0001' :: TIME(4) - INTERVAL '0 00:00:00.0001' DAY TO SECOND
-- !query analysis
Project [cast(00:00:00.0001 as time(4)) + -INTERVAL '0 00:00:00.0001' DAY TO SECOND AS CAST(00:00:00.0001 AS TIME(4)) + (- INTERVAL '0 00:00:00.0001' DAY TO SECOND)#x]
+- OneRowRelation


-- !query
SELECT '08:30' :: TIME(0) - INTERVAL '6' HOUR
-- !query analysis
Project [cast(08:30 as time(0)) + -INTERVAL '06' HOUR AS CAST(08:30 AS TIME(0)) + (- INTERVAL '06' HOUR)#x]
+- OneRowRelation


-- !query
SELECT '10:00:01' :: TIME(1) - INTERVAL '1' MONTH
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES",
"sqlState" : "42K09",
"messageParameters" : {
"left" : "\"TIME(1)\"",
"right" : "\"INTERVAL MONTH\"",
"sqlExpr" : "\"(CAST(10:00:01 AS TIME(1)) - INTERVAL '1' MONTH)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 49,
"fragment" : "'10:00:01' :: TIME(1) - INTERVAL '1' MONTH"
} ]
}
Loading