Skip to content

Commit 7d40760

Browse files
Merge pull request #26 from tdimhcsleumas/main
Databricks Support
2 parents 6be5b41 + 3506c31 commit 7d40760

File tree

13 files changed

+498
-1
lines changed

13 files changed

+498
-1
lines changed

flyway-database-databricks/pom.xml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright (C) Red Gate Software Ltd 2010-2024
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0"
20+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22+
<modelVersion>4.0.0</modelVersion>
23+
<parent>
24+
<groupId>org.flywaydb</groupId>
25+
<artifactId>flyway-community-db-support</artifactId>
26+
<version>10.11.0</version>
27+
</parent>
28+
29+
<artifactId>flyway-database-databricks</artifactId>
30+
<name>${project.artifactId}</name>
31+
32+
<dependencies>
33+
<dependency>
34+
<groupId>${project.groupId}</groupId>
35+
<artifactId>flyway-core</artifactId>
36+
</dependency>
37+
<dependency>
38+
<groupId>com.databricks</groupId>
39+
<artifactId>databricks-jdbc</artifactId>
40+
<version>2.6.33</version>
41+
<optional>true</optional>
42+
</dependency>
43+
</dependencies>
44+
45+
<build>
46+
<resources>
47+
<resource>
48+
<directory>src/main/resources</directory>
49+
<filtering>true</filtering>
50+
</resource>
51+
</resources>
52+
<plugins>
53+
<plugin>
54+
<artifactId>maven-resources-plugin</artifactId>
55+
</plugin>
56+
<plugin>
57+
<artifactId>maven-jar-plugin</artifactId>
58+
</plugin>
59+
</plugins>
60+
</build>
61+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (C) Red Gate Software Ltd 2010-2024
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.flywaydb.community.database;
17+
18+
import org.flywaydb.core.api.FlywayException;
19+
import org.flywaydb.core.extensibility.PluginMetadata;
20+
import org.flywaydb.core.internal.util.FileUtils;
21+
22+
import java.io.IOException;
23+
import java.nio.charset.StandardCharsets;
24+
25+
public class DatabricksDatabaseExtension implements PluginMetadata {
26+
public String getDescription() {
27+
return "Community-contributed Databricks database support extension " + readVersion() + " by Redgate";
28+
}
29+
30+
public static String readVersion() {
31+
try {
32+
return FileUtils.copyToString(
33+
DatabricksDatabaseExtension.class.getClassLoader().getResourceAsStream("org/flywaydb/community/database/databricks/version.txt"),
34+
StandardCharsets.UTF_8);
35+
} catch (IOException e) {
36+
throw new FlywayException("Unable to read extension version: " + e.getMessage(), e);
37+
}
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.flywaydb.community.database.databricks;
2+
3+
import org.flywaydb.core.api.FlywayException;
4+
import org.flywaydb.core.internal.database.base.Connection;
5+
import org.flywaydb.core.internal.database.base.Schema;
6+
import org.flywaydb.core.internal.util.StringUtils;
7+
8+
import java.sql.SQLException;
9+
10+
public class DatabricksConnection extends Connection<DatabricksDatabase> {
11+
protected DatabricksConnection(DatabricksDatabase database, java.sql.Connection connection) {
12+
super(database, connection);
13+
}
14+
15+
@Override
16+
protected String getCurrentSchemaNameOrSearchPath() throws SQLException {
17+
String defaultSchema = "default";
18+
String currentSchema = jdbcTemplate.queryForString("SELECT current_schema();");
19+
return (currentSchema != null) ? currentSchema : defaultSchema;
20+
}
21+
22+
@Override
23+
public void doChangeCurrentSchemaOrSearchPathTo(String schema) throws SQLException {
24+
String sql = "USE SCHEMA" + database.doQuote(schema) + ";";
25+
jdbcTemplate.execute(sql);
26+
}
27+
28+
@Override
29+
public Schema doGetCurrentSchema() throws SQLException {
30+
String currentSchema = getCurrentSchemaNameOrSearchPath();
31+
32+
if (!StringUtils.hasText(currentSchema)) {
33+
throw new FlywayException("Unable to determine current schema as currentSchema is empty.");
34+
}
35+
36+
return getSchema(currentSchema);
37+
}
38+
39+
@Override
40+
public Schema getSchema(String name) {
41+
return new DatabricksSchema(jdbcTemplate, database, name);
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package org.flywaydb.community.database.databricks;
2+
3+
import org.flywaydb.core.api.configuration.Configuration;
4+
import org.flywaydb.core.internal.database.base.Database;
5+
import org.flywaydb.core.internal.database.base.Table;
6+
import org.flywaydb.core.internal.jdbc.JdbcConnectionFactory;
7+
import org.flywaydb.core.internal.jdbc.StatementInterceptor;
8+
import org.flywaydb.core.internal.util.StringUtils;
9+
10+
import java.sql.Connection;
11+
import java.sql.SQLException;
12+
13+
public class DatabricksDatabase extends Database<DatabricksConnection> {
14+
public DatabricksDatabase(Configuration configuration, JdbcConnectionFactory jdbcConnectionFactory, StatementInterceptor statementInterceptor) {
15+
super(configuration, jdbcConnectionFactory, statementInterceptor);
16+
}
17+
18+
@Override
19+
protected DatabricksConnection doGetConnection(Connection connection) {
20+
return new DatabricksConnection(this, connection);
21+
}
22+
23+
@Override
24+
protected String doGetCurrentUser() throws SQLException {
25+
return getMainConnection().getJdbcTemplate().queryForString("SELECT current_user() as user;");
26+
}
27+
28+
@Override
29+
public void ensureSupported(Configuration configuration) {
30+
// Always latest Databricks version.
31+
}
32+
33+
@Override
34+
public boolean supportsDdlTransactions() {
35+
// Databricks is non-transactional
36+
return false;
37+
}
38+
39+
@Override
40+
public String getBooleanTrue() {
41+
return "TRUE";
42+
}
43+
44+
@Override
45+
public String getBooleanFalse() {
46+
return "FALSE";
47+
}
48+
49+
@Override
50+
public boolean catalogIsSchema() {
51+
return false;
52+
}
53+
54+
@Override
55+
public boolean supportsMultiStatementTransactions() {
56+
return false;
57+
}
58+
59+
@Override
60+
public boolean useSingleConnection() {
61+
return true;
62+
}
63+
64+
@Override
65+
public String doQuote(String identifier) {
66+
return getOpenQuote() + StringUtils.replaceAll(identifier, getCloseQuote(), getEscapedQuote()) + getCloseQuote();
67+
}
68+
69+
@Override
70+
protected String getOpenQuote() {
71+
return "`";
72+
}
73+
74+
@Override
75+
protected String getCloseQuote() {
76+
return "`";
77+
}
78+
79+
@Override
80+
public String getEscapedQuote() {
81+
return "\\`";
82+
}
83+
84+
@Override
85+
public String getRawCreateScript(Table table, boolean baseline) {
86+
String sql = "CREATE TABLE " + table + " (\n" +
87+
" `installed_rank` INT NOT NULL,\n" +
88+
" `version` STRING,\n" +
89+
" `description` STRING NOT NULL,\n" +
90+
" `type` STRING NOT NULL,\n" +
91+
" `script` STRING NOT NULL,\n" +
92+
" `checksum` INT,\n" +
93+
" `installed_by` STRING NOT NULL,\n" +
94+
" `installed_on` TIMESTAMP NOT NULL,\n" +
95+
" `execution_time` INT NOT NULL,\n" +
96+
" `success` BOOLEAN NOT NULL\n" +
97+
");\n" +
98+
(baseline ? getBaselineStatement(table) + ";\n" : "");
99+
return sql;
100+
}
101+
102+
@Override
103+
public String getInsertStatement(Table table) {
104+
// Explicitly set installed_on to CURRENT_TIMESTAMP().
105+
return "INSERT INTO " + table
106+
+ " (" + quote("installed_rank")
107+
+ ", " + quote("version")
108+
+ ", " + quote("description")
109+
+ ", " + quote("type")
110+
+ ", " + quote("script")
111+
+ ", " + quote("checksum")
112+
+ ", " + quote("installed_by")
113+
+ ", " + quote("installed_on")
114+
+ ", " + quote("execution_time")
115+
+ ", " + quote("success")
116+
+ ")"
117+
+ " VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP(), ?, ?)";
118+
}
119+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package org.flywaydb.community.database.databricks;
2+
3+
import org.flywaydb.core.api.ResourceProvider;
4+
import org.flywaydb.core.api.configuration.Configuration;
5+
import org.flywaydb.core.internal.database.base.BaseDatabaseType;
6+
import org.flywaydb.core.internal.database.base.CommunityDatabaseType;
7+
import org.flywaydb.core.internal.database.base.Database;
8+
import org.flywaydb.core.internal.jdbc.JdbcConnectionFactory;
9+
import org.flywaydb.core.internal.jdbc.StatementInterceptor;
10+
import org.flywaydb.core.internal.parser.Parser;
11+
import org.flywaydb.core.internal.parser.ParsingContext;
12+
import org.flywaydb.core.internal.util.ClassUtils;
13+
14+
import java.sql.Connection;
15+
import java.sql.Types;
16+
import java.util.Properties;
17+
18+
public class DatabricksDatabaseType extends BaseDatabaseType implements CommunityDatabaseType {
19+
private static final String DATABRICKS_JDBC_DRIVER = "com.databricks.client.jdbc.Driver";
20+
private static final String DATABRICKS_JDBC41_DRIVER = "com.databricks.client.jdbc41.Driver";
21+
22+
@Override
23+
public String getName() {
24+
return "Databricks";
25+
}
26+
27+
@Override
28+
public int getNullType() {
29+
return Types.VARCHAR;
30+
}
31+
32+
@Override
33+
public boolean handlesJDBCUrl(String url) {
34+
return url.startsWith("jdbc:databricks:");
35+
}
36+
37+
@Override
38+
public String getDriverClass(String url, ClassLoader classLoader) {
39+
return "com.databricks.client.jdbc42.Driver";
40+
}
41+
42+
@Override
43+
public String getBackupDriverClass(String url, ClassLoader classLoader) {
44+
if (ClassUtils.isPresent(DATABRICKS_JDBC41_DRIVER, classLoader)) {
45+
return DATABRICKS_JDBC41_DRIVER;
46+
}
47+
return DATABRICKS_JDBC_DRIVER;
48+
}
49+
50+
@Override
51+
public boolean handlesDatabaseProductNameAndVersion(String databaseProductName, String databaseProductVersion, Connection connection) {
52+
return databaseProductName.startsWith("SparkSQL");
53+
}
54+
55+
@Override
56+
public Database createDatabase(Configuration configuration, JdbcConnectionFactory jdbcConnectionFactory, StatementInterceptor statementInterceptor) {
57+
return new DatabricksDatabase(configuration, jdbcConnectionFactory, statementInterceptor);
58+
}
59+
60+
@Override
61+
public Parser createParser(Configuration configuration, ResourceProvider resourceProvider, ParsingContext parsingContext) {
62+
return new DatabricksParser(configuration, parsingContext);
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.flywaydb.community.database.databricks;
2+
3+
import org.flywaydb.core.api.configuration.Configuration;
4+
import org.flywaydb.core.internal.parser.Parser;
5+
import org.flywaydb.core.internal.parser.ParsingContext;
6+
7+
public class DatabricksParser extends Parser {
8+
protected DatabricksParser(Configuration configuration, ParsingContext parsingContext) {
9+
super(configuration, parsingContext, 3);
10+
}
11+
}

0 commit comments

Comments
 (0)