diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index a7bfcfcfea62b..342eefa1a6f63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import java.util.Collections -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException @@ -40,14 +40,13 @@ class DataSourceV2DataFrameSuite import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import testImplicits._ - before { - spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) - spark.conf.set("spark.sql.catalog.testcat2", classOf[InMemoryTableCatalog].getName) - } + override protected def sparkConf: SparkConf = super.sparkConf + .set(SQLConf.ANSI_ENABLED, true) + .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) + .set("spark.sql.catalog.testcat2", classOf[InMemoryTableCatalog].getName) after { spark.sessionState.catalogManager.reset() - spark.sessionState.conf.clear() } override protected val catalogAndNamespace: String = "testcat.ns1.ns2.tbls" @@ -352,185 +351,180 @@ class DataSourceV2DataFrameSuite test("create/replace table with complex foldable default values") { val tableName = "testcat.ns1.ns2.tbl" withTable(tableName) { - withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { - val createExec = executeAndKeepPhysicalPlan[CreateTableExec] { - sql( - s""" - |CREATE TABLE $tableName ( - | id INT, - | salary INT DEFAULT (100 + 23), - | dep STRING DEFAULT ('h' || 'r'), - | active BOOLEAN DEFAULT CAST(1 AS BOOLEAN) - |) USING foo - |""".stripMargin) - } + val createExec = executeAndKeepPhysicalPlan[CreateTableExec] { + sql( + s""" + |CREATE TABLE $tableName ( + | id INT, + | salary INT DEFAULT (100 + 23), + | dep STRING DEFAULT ('h' || 'r'), + | active BOOLEAN DEFAULT CAST(1 AS BOOLEAN) + |) USING foo + |""".stripMargin) + } - checkDefaultValues( - createExec.columns, - Array( - null, - new ColumnDefaultValue( - "(100 + 23)", - new GeneralScalarExpression( - "+", - Array(LiteralValue(100, IntegerType), LiteralValue(23, IntegerType))), - LiteralValue(123, IntegerType)), - new ColumnDefaultValue( - "('h' || 'r')", - new GeneralScalarExpression( - "CONCAT", - Array( - LiteralValue(UTF8String.fromString("h"), StringType), - LiteralValue(UTF8String.fromString("r"), StringType))), - LiteralValue(UTF8String.fromString("hr"), StringType)), - new ColumnDefaultValue( - "CAST(1 AS BOOLEAN)", - new V2Cast(LiteralValue(1, IntegerType), IntegerType, BooleanType), - LiteralValue(true, BooleanType)))) + checkDefaultValues( + createExec.columns, + Array( + null, + new ColumnDefaultValue( + "(100 + 23)", + new GeneralScalarExpression( + "+", + Array(LiteralValue(100, IntegerType), LiteralValue(23, IntegerType))), + LiteralValue(123, IntegerType)), + new ColumnDefaultValue( + "('h' || 'r')", + new GeneralScalarExpression( + "CONCAT", + Array( + LiteralValue(UTF8String.fromString("h"), StringType), + LiteralValue(UTF8String.fromString("r"), StringType))), + LiteralValue(UTF8String.fromString("hr"), StringType)), + new ColumnDefaultValue( + "CAST(1 AS BOOLEAN)", + new V2Cast(LiteralValue(1, IntegerType), IntegerType, BooleanType), + LiteralValue(true, BooleanType)))) - val df1 = Seq(1).toDF("id") - df1.writeTo(tableName).append() + val df1 = Seq(1).toDF("id") + df1.writeTo(tableName).append() - sql(s"ALTER TABLE $tableName ALTER COLUMN dep SET DEFAULT ('i' || 't')") + sql(s"ALTER TABLE $tableName ALTER COLUMN dep SET DEFAULT ('i' || 't')") - val df2 = Seq(2).toDF("id") - df2.writeTo(tableName).append() + val df2 = Seq(2).toDF("id") + df2.writeTo(tableName).append() - checkAnswer( - sql(s"SELECT * FROM $tableName"), - Seq( - Row(1, 123, "hr", true), - Row(2, 123, "it", true))) + checkAnswer( + sql(s"SELECT * FROM $tableName"), + Seq( + Row(1, 123, "hr", true), + Row(2, 123, "it", true))) - val replaceExec = executeAndKeepPhysicalPlan[ReplaceTableExec] { - sql( - s""" - |REPLACE TABLE $tableName ( - | id INT, - | salary INT DEFAULT (50 * 2), - | dep STRING DEFAULT ('un' || 'known'), - | active BOOLEAN DEFAULT CAST(0 AS BOOLEAN) - |) USING foo - |""".stripMargin) - } + val replaceExec = executeAndKeepPhysicalPlan[ReplaceTableExec] { + sql( + s""" + |REPLACE TABLE $tableName ( + | id INT, + | salary INT DEFAULT (50 * 2), + | dep STRING DEFAULT ('un' || 'known'), + | active BOOLEAN DEFAULT CAST(0 AS BOOLEAN) + |) USING foo + |""".stripMargin) + } - checkDefaultValues( - replaceExec.columns, - Array( - null, - new ColumnDefaultValue( - "(50 * 2)", - new GeneralScalarExpression( - "*", - Array(LiteralValue(50, IntegerType), LiteralValue(2, IntegerType))), - LiteralValue(100, IntegerType)), - new ColumnDefaultValue( - "('un' || 'known')", - new GeneralScalarExpression( - "CONCAT", - Array( - LiteralValue(UTF8String.fromString("un"), StringType), - LiteralValue(UTF8String.fromString("known"), StringType))), - LiteralValue(UTF8String.fromString("unknown"), StringType)), - new ColumnDefaultValue( - "CAST(0 AS BOOLEAN)", - new V2Cast(LiteralValue(0, IntegerType), IntegerType, BooleanType), - LiteralValue(false, BooleanType)))) + checkDefaultValues( + replaceExec.columns, + Array( + null, + new ColumnDefaultValue( + "(50 * 2)", + new GeneralScalarExpression( + "*", + Array(LiteralValue(50, IntegerType), LiteralValue(2, IntegerType))), + LiteralValue(100, IntegerType)), + new ColumnDefaultValue( + "('un' || 'known')", + new GeneralScalarExpression( + "CONCAT", + Array( + LiteralValue(UTF8String.fromString("un"), StringType), + LiteralValue(UTF8String.fromString("known"), StringType))), + LiteralValue(UTF8String.fromString("unknown"), StringType)), + new ColumnDefaultValue( + "CAST(0 AS BOOLEAN)", + new V2Cast(LiteralValue(0, IntegerType), IntegerType, BooleanType), + LiteralValue(false, BooleanType)))) - val df3 = Seq(1).toDF("id") - df3.writeTo(tableName).append() + val df3 = Seq(1).toDF("id") + df3.writeTo(tableName).append() - checkAnswer( - sql(s"SELECT * FROM $tableName"), - Seq(Row(1, 100, "unknown", false))) - } + checkAnswer( + sql(s"SELECT * FROM $tableName"), + Seq(Row(1, 100, "unknown", false))) } } + test("alter table add column with complex foldable default values") { val tableName = "testcat.ns1.ns2.tbl" - withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { - withTable(tableName) { - sql( - s""" - |CREATE TABLE $tableName ( - | dummy INT - |) USING foo - |""".stripMargin) - - val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] { - sql(s"ALTER TABLE $tableName ADD COLUMNS (" + - s"salary INT DEFAULT (100 + 23), " + - s"dep STRING DEFAULT ('h' || 'r'), " + - s"active BOOLEAN DEFAULT CAST(1 AS BOOLEAN))") - } + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName ( + | dummy INT + |) USING foo + |""".stripMargin) - checkDefaultValues( - alterExec.changes.map(_.asInstanceOf[AddColumn]).toArray, - Array( - new ColumnDefaultValue( - "(100 + 23)", - new GeneralScalarExpression( - "+", - Array(LiteralValue(100, IntegerType), LiteralValue(23, IntegerType))), - LiteralValue(123, IntegerType)), - new ColumnDefaultValue( - "('h' || 'r')", - new GeneralScalarExpression( - "CONCAT", - Array( - LiteralValue(UTF8String.fromString("h"), StringType), - LiteralValue(UTF8String.fromString("r"), StringType))), - LiteralValue(UTF8String.fromString("hr"), StringType)), - new ColumnDefaultValue( - "CAST(1 AS BOOLEAN)", - new V2Cast(LiteralValue(1, IntegerType), IntegerType, BooleanType), - LiteralValue(true, BooleanType)))) + val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] { + sql(s"ALTER TABLE $tableName ADD COLUMNS (" + + s"salary INT DEFAULT (100 + 23), " + + s"dep STRING DEFAULT ('h' || 'r'), " + + s"active BOOLEAN DEFAULT CAST(1 AS BOOLEAN))") } + + checkDefaultValues( + alterExec.changes.map(_.asInstanceOf[AddColumn]).toArray, + Array( + new ColumnDefaultValue( + "(100 + 23)", + new GeneralScalarExpression( + "+", + Array(LiteralValue(100, IntegerType), LiteralValue(23, IntegerType))), + LiteralValue(123, IntegerType)), + new ColumnDefaultValue( + "('h' || 'r')", + new GeneralScalarExpression( + "CONCAT", + Array( + LiteralValue(UTF8String.fromString("h"), StringType), + LiteralValue(UTF8String.fromString("r"), StringType))), + LiteralValue(UTF8String.fromString("hr"), StringType)), + new ColumnDefaultValue( + "CAST(1 AS BOOLEAN)", + new V2Cast(LiteralValue(1, IntegerType), IntegerType, BooleanType), + LiteralValue(true, BooleanType)))) } } test("alter table alter column with complex foldable default values") { val tableName = "testcat.ns1.ns2.tbl" - withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { - withTable(tableName) { - sql( - s""" - |CREATE TABLE $tableName ( - | salary INT DEFAULT (100 + 23), - | dep STRING DEFAULT ('h' || 'r'), - | active BOOLEAN DEFAULT CAST(1 AS BOOLEAN) - |) USING foo - |""".stripMargin) + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName ( + | salary INT DEFAULT (100 + 23), + | dep STRING DEFAULT ('h' || 'r'), + | active BOOLEAN DEFAULT CAST(1 AS BOOLEAN) + |) USING foo + |""".stripMargin) - val alterExecCol1 = executeAndKeepPhysicalPlan[AlterTableExec] { - sql( - s""" - |ALTER TABLE $tableName ALTER COLUMN - | salary SET DEFAULT (123 + 56), - | dep SET DEFAULT ('r' || 'l'), - | active SET DEFAULT CAST(0 AS BOOLEAN) - |""".stripMargin) - } - checkDefaultValues( - alterExecCol1.changes.map(_.asInstanceOf[UpdateColumnDefaultValue]).toArray, - Array( - new DefaultValue( - "(123 + 56)", - new GeneralScalarExpression( - "+", - Array(LiteralValue(123, IntegerType), LiteralValue(56, IntegerType)))), - new DefaultValue( - "('r' || 'l')", - new GeneralScalarExpression( - "CONCAT", - Array( - LiteralValue(UTF8String.fromString("r"), StringType), - LiteralValue(UTF8String.fromString("l"), StringType)))), - new DefaultValue( - "CAST(0 AS BOOLEAN)", - new V2Cast(LiteralValue(0, IntegerType), IntegerType, BooleanType)))) + val alterExecCol1 = executeAndKeepPhysicalPlan[AlterTableExec] { + sql( + s""" + |ALTER TABLE $tableName ALTER COLUMN + | salary SET DEFAULT (123 + 56), + | dep SET DEFAULT ('r' || 'l'), + | active SET DEFAULT CAST(0 AS BOOLEAN) + |""".stripMargin) } + checkDefaultValues( + alterExecCol1.changes.map(_.asInstanceOf[UpdateColumnDefaultValue]).toArray, + Array( + new DefaultValue( + "(123 + 56)", + new GeneralScalarExpression( + "+", + Array(LiteralValue(123, IntegerType), LiteralValue(56, IntegerType)))), + new DefaultValue( + "('r' || 'l')", + new GeneralScalarExpression( + "CONCAT", + Array( + LiteralValue(UTF8String.fromString("r"), StringType), + LiteralValue(UTF8String.fromString("l"), StringType)))), + new DefaultValue( + "CAST(0 AS BOOLEAN)", + new V2Cast(LiteralValue(0, IntegerType), IntegerType, BooleanType)))) } } @@ -677,7 +671,6 @@ class DataSourceV2DataFrameSuite test("create/replace table default value expression should have a cast") { val tableName = "testcat.ns1.ns2.tbl" withTable(tableName) { - val createExec = executeAndKeepPhysicalPlan[CreateTableExec] { sql( s""" @@ -693,8 +686,8 @@ class DataSourceV2DataFrameSuite null, new ColumnDefaultValue( "'2018-11-17 13:33:33'", - new LiteralValue(1542490413000000L, TimestampType), - new LiteralValue(1542490413000000L, TimestampType)), + LiteralValue(1542490413000000L, TimestampType), + LiteralValue(1542490413000000L, TimestampType)), new ColumnDefaultValue( "1", new V2Cast(LiteralValue(1, IntegerType), IntegerType, DoubleType), @@ -728,53 +721,53 @@ class DataSourceV2DataFrameSuite } } + test("alter table default value expression should have a cast") { val tableName = "testcat.ns1.ns2.tbl" - withTable(tableName) { - - sql(s"CREATE TABLE $tableName (col1 int) using foo") - val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] { - sql( - s""" - |ALTER TABLE $tableName ADD COLUMNS ( - | col2 timestamp DEFAULT '2018-11-17 13:33:33', - | col3 double DEFAULT 1) - |""".stripMargin) - } + withTable(tableName) { + sql(s"CREATE TABLE $tableName (col1 int) using foo") + val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] { + sql( + s""" + |ALTER TABLE $tableName ADD COLUMNS ( + | col2 timestamp DEFAULT '2018-11-17 13:33:33', + | col3 double DEFAULT 1) + |""".stripMargin) + } - checkDefaultValues( - alterExec.changes.map(_.asInstanceOf[AddColumn]).toArray, - Array( - new ColumnDefaultValue( - "'2018-11-17 13:33:33'", - LiteralValue(1542490413000000L, TimestampType), - LiteralValue(1542490413000000L, TimestampType)), - new ColumnDefaultValue( - "1", - new V2Cast(LiteralValue(1, IntegerType), IntegerType, DoubleType), - LiteralValue(1.0, DoubleType)))) + checkDefaultValues( + alterExec.changes.map(_.asInstanceOf[AddColumn]).toArray, + Array( + new ColumnDefaultValue( + "'2018-11-17 13:33:33'", + LiteralValue(1542490413000000L, TimestampType), + LiteralValue(1542490413000000L, TimestampType)), + new ColumnDefaultValue( + "1", + new V2Cast(LiteralValue(1, IntegerType), IntegerType, DoubleType), + LiteralValue(1.0, DoubleType)))) - val alterCol1 = executeAndKeepPhysicalPlan[AlterTableExec] { - sql( - s""" - |ALTER TABLE $tableName ALTER COLUMN - | col2 SET DEFAULT '2022-02-23 05:55:55', - | col3 SET DEFAULT (1 + 1) - |""".stripMargin) + val alterCol1 = executeAndKeepPhysicalPlan[AlterTableExec] { + sql( + s""" + |ALTER TABLE $tableName ALTER COLUMN + | col2 SET DEFAULT '2022-02-23 05:55:55', + | col3 SET DEFAULT (1 + 1) + |""".stripMargin) + } + checkDefaultValues( + alterCol1.changes.map(_.asInstanceOf[UpdateColumnDefaultValue]).toArray, + Array( + new DefaultValue("'2022-02-23 05:55:55'", + LiteralValue(1645624555000000L, TimestampType)), + new DefaultValue( + "(1 + 1)", + new V2Cast( + new GeneralScalarExpression("+", Array(LiteralValue(1, IntegerType), + LiteralValue(1, IntegerType))), + IntegerType, + DoubleType)))) } - checkDefaultValues( - alterCol1.changes.map(_.asInstanceOf[UpdateColumnDefaultValue]).toArray, - Array( - new DefaultValue("'2022-02-23 05:55:55'", - LiteralValue(1645624555000000L, TimestampType)), - new DefaultValue( - "(1 + 1)", - new V2Cast( - new GeneralScalarExpression("+", Array(LiteralValue(1, IntegerType), - LiteralValue(1, IntegerType))), - IntegerType, - DoubleType)))) - } } test("write with supported expression-based default values") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 48ccf33faaa1c..cd4cd462088a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -25,7 +25,7 @@ import java.util.Locale import scala.concurrent.duration.MICROSECONDS import scala.jdk.CollectionConverters._ -import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException} +import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, SparkUnsupportedOperationException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER @@ -57,6 +57,9 @@ abstract class DataSourceV2SQLSuite with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase with AdaptiveSparkPlanHelper { + override protected def sparkConf: SparkConf = + super.sparkConf.set(SQLConf.ANSI_ENABLED, true) + protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName override protected val v2Format = v2Source diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DatasourceV2SQLBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DatasourceV2SQLBase.scala index 4ccff44fa0674..683d814d28b43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DatasourceV2SQLBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DatasourceV2SQLBase.scala @@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.QueryTest import org.apache.spark.sql.connector.catalog.{CatalogPlugin, InMemoryCatalog, InMemoryPartitionTableCatalog, InMemoryTableWithV2FilterCatalog, StagingInMemoryTableCatalog} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession trait DatasourceV2SQLBase @@ -52,6 +53,7 @@ trait DatasourceV2SQLBase after { spark.sessionState.catalog.reset() spark.sessionState.catalogManager.reset() - spark.sessionState.conf.clear() + spark.sessionState.conf.unsetConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) + spark.sessionState.conf.unsetConf(SQLConf.DEFAULT_CATALOG) } }