diff --git a/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBConnection.java b/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBConnection.java index d12bea1..a5d517b 100644 --- a/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBConnection.java +++ b/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBConnection.java @@ -16,8 +16,11 @@ package org.flywaydb.community.database.postgresql.yugabytedb; import org.flywaydb.core.internal.database.base.Schema; +import org.flywaydb.core.internal.database.base.Table; import org.flywaydb.database.postgresql.PostgreSQLConnection; +import java.util.concurrent.Callable; + public class YugabyteDBConnection extends PostgreSQLConnection { YugabyteDBConnection(YugabyteDBDatabase database, java.sql.Connection connection) { @@ -28,4 +31,9 @@ public class YugabyteDBConnection extends PostgreSQLConnection { public Schema getSchema(String name) { return new YugabyteDBSchema(jdbcTemplate, (YugabyteDBDatabase) database, name); } + + @Override + public T lock(Table table, Callable callable) { + return new YugabyteDBExecutionTemplate(jdbcTemplate, table.toString()).execute(callable); + } } \ No newline at end of file diff --git a/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBDatabase.java b/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBDatabase.java index 4f1fbaf..01180b0 100644 --- a/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBDatabase.java +++ b/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBDatabase.java @@ -18,31 +18,33 @@ import lombok.CustomLog; import org.flywaydb.core.api.configuration.Configuration; import org.flywaydb.core.internal.database.base.Table; +import org.flywaydb.core.internal.exception.FlywaySqlException; import org.flywaydb.core.internal.jdbc.JdbcConnectionFactory; import org.flywaydb.core.internal.jdbc.StatementInterceptor; import org.flywaydb.database.postgresql.PostgreSQLDatabase; import java.sql.Connection; import java.sql.SQLException; -import java.sql.Statement; @CustomLog public class YugabyteDBDatabase extends PostgreSQLDatabase { + public static final String LOCK_TABLE_NAME = "YB_FLYWAY_LOCK_TABLE"; + /** + * This table is used to enforce locking through SELECT ... FOR UPDATE on a + * token row inserted in this table. The token row is inserted with the name + * of the Flyway's migration history table as a token for simplicity. + */ + private static final String CREATE_LOCK_TABLE_DDL = "CREATE TABLE IF NOT EXISTS " + LOCK_TABLE_NAME + " (table_name varchar PRIMARY KEY, locked bool)"; + public YugabyteDBDatabase(Configuration configuration, JdbcConnectionFactory jdbcConnectionFactory, StatementInterceptor statementInterceptor) { super(configuration, jdbcConnectionFactory, statementInterceptor); + createLockTable(); } @Override protected YugabyteDBConnection doGetConnection(Connection connection) { - Statement stmt = null; - try { - stmt = connection.createStatement(); - stmt.execute("set yb_silence_advisory_locks_not_supported_error=on;"); - } catch (SQLException throwable) { - LOG.error("Unable to set yb_silence_advisory_locks_not_supported_error ", throwable); - } return new YugabyteDBConnection(this, connection); } @@ -75,4 +77,25 @@ public String getRawCreateScript(Table table, boolean baseline) { "CREATE INDEX IF NOT EXISTS \"" + table.getName() + "_s_idx\" ON " + table + " (\"success\");"; } -} \ No newline at end of file + /** + * YugabyteDB does not support PG Advisor Locks. So the YugabyteDB plugin + * employs SELECT ... FOR UPDATE in a transaction to implement locking for + * Flyway operations instead of the PG Advisory locks. If a single + * connection is used, it may cause issues if multiple threads execute + * begin/commit on it for Flyway operations. Returning false from this + * method ensures the same connection is not used for migrations. + * @return false + */ + @Override + public boolean useSingleConnection() { + return false; + } + + private void createLockTable() { + try { + jdbcTemplate.execute(CREATE_LOCK_TABLE_DDL); + } catch (SQLException e) { + throw new FlywaySqlException("Unable to initialize the lock table", e); + } + } +} diff --git a/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBDatabaseType.java b/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBDatabaseType.java index afbfbe3..a45726e 100644 --- a/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBDatabaseType.java +++ b/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBDatabaseType.java @@ -67,4 +67,18 @@ public Parser createParser(Configuration configuration, ResourceProvider resourc public String getPluginVersion(Configuration config) { return YugabyteDBDatabaseExtension.readVersion(); } + + /** + * Returns the YugabyteDB Smart driver classname if the smart driver is + * being used. The plugin will work with the Postgresql JDBC driver also + * since the url in that case would start with 'jdbc:postgresql' which would + * return the PG JDBC driver class name. + * @param url + * @param classLoader + * @return "com.yugabyte.Driver" if url starts with "jdbc:yugabytedb:" + */ + @Override + public String getDriverClass(String url, ClassLoader classLoader) { + return url.startsWith("jdbc:yugabytedb:") ? "com.yugabyte.Driver" : super.getDriverClass(url, classLoader); + } } \ No newline at end of file diff --git a/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBExecutionTemplate.java b/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBExecutionTemplate.java new file mode 100644 index 0000000..7a08f9b --- /dev/null +++ b/flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBExecutionTemplate.java @@ -0,0 +1,167 @@ +package org.flywaydb.community.database.postgresql.yugabytedb; + +import lombok.CustomLog; +import org.flywaydb.core.api.FlywayException; +import org.flywaydb.core.internal.exception.FlywaySqlException; +import org.flywaydb.core.internal.jdbc.JdbcTemplate; +import org.flywaydb.core.internal.strategy.RetryStrategy; +import org.flywaydb.core.internal.util.FlywayDbWebsiteLinks; + +import java.sql.*; +import java.util.HashMap; +import java.util.concurrent.Callable; + +@CustomLog +public class YugabyteDBExecutionTemplate { + + private final JdbcTemplate jdbcTemplate; + private final String tableName; + private final HashMap tableEntries = new HashMap<>(); + + + YugabyteDBExecutionTemplate(JdbcTemplate jdbcTemplate, String tableName) { + this.jdbcTemplate = jdbcTemplate; + this.tableName = tableName; + } + + public T execute(Callable callable) { + Exception error = null; + try { + lock(); + return callable.call(); + } catch (RuntimeException e) { + error = e; + throw e; + } catch (Exception e) { + error = e; + throw new FlywayException(e); + } finally { + unlock(error); + } + } + + private void lock() throws SQLException { + RetryStrategy strategy = new RetryStrategy(); + strategy.doWithRetries(this::tryLock, "Interrupted while attempting to acquire lock through SELECT ... FOR UPDATE", + "Number of retries exceeded while attempting to acquire lock through SELECT ... FOR UPDATE. " + + "Configure the number of retries with the 'lockRetryCount' configuration option: " + FlywayDbWebsiteLinks.LOCK_RETRY_COUNT); + + } + + private boolean tryLock() { + Exception exception = null; + boolean txStarted = false, success = false; + Statement statement = null; + try { + statement = jdbcTemplate.getConnection().createStatement(); + + if (!tableEntries.containsKey(tableName)) { + try { + statement.executeUpdate("INSERT INTO " + + YugabyteDBDatabase.LOCK_TABLE_NAME + + " VALUES ('" + tableName + "', 'false')"); + tableEntries.put(tableName, true); + LOG.info(Thread.currentThread().getName() + "> Inserted a token row for " + tableName + " in " + YugabyteDBDatabase.LOCK_TABLE_NAME); + } catch (SQLException e) { + if ("23505".equals(e.getSQLState())) { + // 23505 == UNIQUE_VIOLATION + LOG.debug(Thread.currentThread().getName() + "> Token row already added for " + tableName); + } else { + throw new FlywaySqlException("Could not add token row for " + tableName + " in table " + YugabyteDBDatabase.LOCK_TABLE_NAME, e); + } + } + } + + boolean locked; + String selectForUpdate = "SELECT locked FROM " + + YugabyteDBDatabase.LOCK_TABLE_NAME + + " WHERE table_name = '" + + tableName + + "' FOR UPDATE"; + String updateLocked = "UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME + + " SET locked = true WHERE table_name = '" + + tableName + "'"; + + statement.execute("BEGIN"); + txStarted = true; + ResultSet rs = statement.executeQuery(selectForUpdate); + if (rs.next()) { + locked = rs.getBoolean("locked"); + + if (locked) { + statement.execute("COMMIT"); + txStarted = false; + LOG.debug(Thread.currentThread().getName() + "> Another Flyway operation is in progress. Allowing it to complete"); + } else { + LOG.debug(Thread.currentThread().getName() + "> Setting locked = true"); + statement.executeUpdate(updateLocked); + success = true; + } + } else { + // For some reason the record was not found, retry + tableEntries.remove(tableName); + } + + } catch (SQLException e) { + LOG.warn(Thread.currentThread().getName() + "> Unable to perform lock action, SQLState: " + e.getSQLState()); + if (!"40001".equalsIgnoreCase(e.getSQLState())) { + exception = new FlywaySqlException("Unable to perform lock action", e); + throw (FlywaySqlException) exception; + } // else retry + } finally { + if (txStarted) { + try { + statement.execute("COMMIT"); + LOG.debug(Thread.currentThread().getName() + "> Completed the tx to set locked = true"); + } catch (SQLException e) { + if (exception == null) { + throw new FlywaySqlException("Failed to commit the tx to set locked = true", e); + } + LOG.warn(Thread.currentThread().getName() + "> Failed to commit the tx to set locked = true: " + e); + } + } + } + return success; + } + + private void unlock(Exception rethrow) { + Statement statement = null; + try { + statement = jdbcTemplate.getConnection().createStatement(); + statement.execute("BEGIN"); + ResultSet rs = statement.executeQuery("SELECT locked FROM " + YugabyteDBDatabase.LOCK_TABLE_NAME + " WHERE table_name = '" + tableName + "' FOR UPDATE"); + + if (rs.next()) { + boolean locked = rs.getBoolean("locked"); + if (locked) { + statement.executeUpdate("UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME + " SET locked = false WHERE table_name = '" + tableName + "'"); + } else { + // Unexpected. This may happen only when callable took too long to complete + // and another thread forcefully reset it. + String msg = "Unlock failed but the Flyway operation may have succeeded. Check your Flyway operation before re-trying"; + LOG.warn(Thread.currentThread().getName() + "> " + msg); + if (rethrow == null) { + throw new FlywayException(msg); + } + } + } + } catch (SQLException e) { + if (rethrow == null) { + rethrow = new FlywayException("Unable to perform unlock action", e); + throw (FlywaySqlException) rethrow; + } + LOG.warn("Unable to perform unlock action " + e); + } finally { + try { + statement.execute("COMMIT"); + LOG.debug(Thread.currentThread().getName() + "> Completed the tx to set locked = false"); + } catch (SQLException e) { + if (rethrow == null) { + throw new FlywaySqlException("Failed to commit unlock action", e); + } + LOG.warn("Failed to commit unlock action: " + e); + } + } + } + +}