Skip to content

Workaround for absence of advisory locks in YugabyteDB #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package org.flywaydb.community.database.postgresql.yugabytedb;

import org.flywaydb.core.internal.database.base.Schema;
import org.flywaydb.core.internal.database.base.Table;
import org.flywaydb.database.postgresql.PostgreSQLConnection;

import java.util.concurrent.Callable;

public class YugabyteDBConnection extends PostgreSQLConnection {

YugabyteDBConnection(YugabyteDBDatabase database, java.sql.Connection connection) {
Expand All @@ -28,4 +31,9 @@ public class YugabyteDBConnection extends PostgreSQLConnection {
public Schema getSchema(String name) {
return new YugabyteDBSchema(jdbcTemplate, (YugabyteDBDatabase) database, name);
}

@Override
public <T> T lock(Table table, Callable<T> callable) {
return new YugabyteDBExecutionTemplate(jdbcTemplate, table.toString()).execute(callable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,33 @@
import lombok.CustomLog;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.internal.database.base.Table;
import org.flywaydb.core.internal.exception.FlywaySqlException;
import org.flywaydb.core.internal.jdbc.JdbcConnectionFactory;
import org.flywaydb.core.internal.jdbc.StatementInterceptor;
import org.flywaydb.database.postgresql.PostgreSQLDatabase;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;


@CustomLog
public class YugabyteDBDatabase extends PostgreSQLDatabase {

public static final String LOCK_TABLE_NAME = "YB_FLYWAY_LOCK_TABLE";
/**
* This table is used to enforce locking through SELECT ... FOR UPDATE on a
* token row inserted in this table. The token row is inserted with the name
* of the Flyway's migration history table as a token for simplicity.
*/
private static final String CREATE_LOCK_TABLE_DDL = "CREATE TABLE IF NOT EXISTS " + LOCK_TABLE_NAME + " (table_name varchar PRIMARY KEY, locked bool)";

public YugabyteDBDatabase(Configuration configuration, JdbcConnectionFactory jdbcConnectionFactory, StatementInterceptor statementInterceptor) {
super(configuration, jdbcConnectionFactory, statementInterceptor);
createLockTable();
}

@Override
protected YugabyteDBConnection doGetConnection(Connection connection) {
Statement stmt = null;
try {
stmt = connection.createStatement();
stmt.execute("set yb_silence_advisory_locks_not_supported_error=on;");
} catch (SQLException throwable) {
LOG.error("Unable to set yb_silence_advisory_locks_not_supported_error ", throwable);
}
return new YugabyteDBConnection(this, connection);
}

Expand Down Expand Up @@ -75,4 +77,25 @@ public String getRawCreateScript(Table table, boolean baseline) {
"CREATE INDEX IF NOT EXISTS \"" + table.getName() + "_s_idx\" ON " + table + " (\"success\");";
}

}
/**
* YugabyteDB does not support PG Advisor Locks. So the YugabyteDB plugin
* employs SELECT ... FOR UPDATE in a transaction to implement locking for
* Flyway operations instead of the PG Advisory locks. If a single
* connection is used, it may cause issues if multiple threads execute
* begin/commit on it for Flyway operations. Returning false from this
* method ensures the same connection is not used for migrations.
* @return false
*/
@Override
public boolean useSingleConnection() {
return false;
}

private void createLockTable() {
try {
jdbcTemplate.execute(CREATE_LOCK_TABLE_DDL);
} catch (SQLException e) {
throw new FlywaySqlException("Unable to initialize the lock table", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,18 @@ public Parser createParser(Configuration configuration, ResourceProvider resourc
public String getPluginVersion(Configuration config) {
return YugabyteDBDatabaseExtension.readVersion();
}

/**
* Returns the YugabyteDB Smart driver classname if the smart driver is
* being used. The plugin will work with the Postgresql JDBC driver also
* since the url in that case would start with 'jdbc:postgresql' which would
* return the PG JDBC driver class name.
* @param url
* @param classLoader
* @return "com.yugabyte.Driver" if url starts with "jdbc:yugabytedb:"
*/
@Override
public String getDriverClass(String url, ClassLoader classLoader) {
return url.startsWith("jdbc:yugabytedb:") ? "com.yugabyte.Driver" : super.getDriverClass(url, classLoader);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package org.flywaydb.community.database.postgresql.yugabytedb;

import lombok.CustomLog;
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.internal.exception.FlywaySqlException;
import org.flywaydb.core.internal.jdbc.JdbcTemplate;
import org.flywaydb.core.internal.strategy.RetryStrategy;
import org.flywaydb.core.internal.util.FlywayDbWebsiteLinks;

import java.sql.*;
import java.util.HashMap;
import java.util.concurrent.Callable;

@CustomLog
public class YugabyteDBExecutionTemplate {

private final JdbcTemplate jdbcTemplate;
private final String tableName;
private final HashMap<String, Boolean> tableEntries = new HashMap<>();


YugabyteDBExecutionTemplate(JdbcTemplate jdbcTemplate, String tableName) {
this.jdbcTemplate = jdbcTemplate;
this.tableName = tableName;
}

public <T> T execute(Callable<T> callable) {
Exception error = null;
try {
lock();
return callable.call();
} catch (RuntimeException e) {
error = e;
throw e;
} catch (Exception e) {
error = e;
throw new FlywayException(e);
} finally {
unlock(error);
}
}

private void lock() throws SQLException {
RetryStrategy strategy = new RetryStrategy();
strategy.doWithRetries(this::tryLock, "Interrupted while attempting to acquire lock through SELECT ... FOR UPDATE",
"Number of retries exceeded while attempting to acquire lock through SELECT ... FOR UPDATE. " +
"Configure the number of retries with the 'lockRetryCount' configuration option: " + FlywayDbWebsiteLinks.LOCK_RETRY_COUNT);

}

private boolean tryLock() {
Exception exception = null;
boolean txStarted = false, success = false;
Statement statement = null;
try {
statement = jdbcTemplate.getConnection().createStatement();

if (!tableEntries.containsKey(tableName)) {
try {
statement.executeUpdate("INSERT INTO "
+ YugabyteDBDatabase.LOCK_TABLE_NAME
+ " VALUES ('" + tableName + "', 'false')");
tableEntries.put(tableName, true);
LOG.info(Thread.currentThread().getName() + "> Inserted a token row for " + tableName + " in " + YugabyteDBDatabase.LOCK_TABLE_NAME);
} catch (SQLException e) {
if ("23505".equals(e.getSQLState())) {
// 23505 == UNIQUE_VIOLATION
LOG.debug(Thread.currentThread().getName() + "> Token row already added for " + tableName);
} else {
throw new FlywaySqlException("Could not add token row for " + tableName + " in table " + YugabyteDBDatabase.LOCK_TABLE_NAME, e);
}
}
}

boolean locked;
String selectForUpdate = "SELECT locked FROM "
+ YugabyteDBDatabase.LOCK_TABLE_NAME
+ " WHERE table_name = '"
+ tableName
+ "' FOR UPDATE";
String updateLocked = "UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME
+ " SET locked = true WHERE table_name = '"
+ tableName + "'";

statement.execute("BEGIN");
txStarted = true;
ResultSet rs = statement.executeQuery(selectForUpdate);
if (rs.next()) {
locked = rs.getBoolean("locked");

if (locked) {
statement.execute("COMMIT");
txStarted = false;
LOG.debug(Thread.currentThread().getName() + "> Another Flyway operation is in progress. Allowing it to complete");
} else {
LOG.debug(Thread.currentThread().getName() + "> Setting locked = true");
statement.executeUpdate(updateLocked);
success = true;
}
} else {
// For some reason the record was not found, retry
tableEntries.remove(tableName);
}

} catch (SQLException e) {
LOG.warn(Thread.currentThread().getName() + "> Unable to perform lock action, SQLState: " + e.getSQLState());
if (!"40001".equalsIgnoreCase(e.getSQLState())) {
exception = new FlywaySqlException("Unable to perform lock action", e);
throw (FlywaySqlException) exception;
} // else retry
} finally {
if (txStarted) {
try {
statement.execute("COMMIT");
LOG.debug(Thread.currentThread().getName() + "> Completed the tx to set locked = true");
} catch (SQLException e) {
if (exception == null) {
throw new FlywaySqlException("Failed to commit the tx to set locked = true", e);
}
LOG.warn(Thread.currentThread().getName() + "> Failed to commit the tx to set locked = true: " + e);
}
}
}
return success;
}

private void unlock(Exception rethrow) {
Statement statement = null;
try {
statement = jdbcTemplate.getConnection().createStatement();
statement.execute("BEGIN");
ResultSet rs = statement.executeQuery("SELECT locked FROM " + YugabyteDBDatabase.LOCK_TABLE_NAME + " WHERE table_name = '" + tableName + "' FOR UPDATE");

if (rs.next()) {
boolean locked = rs.getBoolean("locked");
if (locked) {
statement.executeUpdate("UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME + " SET locked = false WHERE table_name = '" + tableName + "'");
} else {
// Unexpected. This may happen only when callable took too long to complete
// and another thread forcefully reset it.
String msg = "Unlock failed but the Flyway operation may have succeeded. Check your Flyway operation before re-trying";
LOG.warn(Thread.currentThread().getName() + "> " + msg);
if (rethrow == null) {
throw new FlywayException(msg);
}
}
}
} catch (SQLException e) {
if (rethrow == null) {
rethrow = new FlywayException("Unable to perform unlock action", e);
throw (FlywaySqlException) rethrow;
}
LOG.warn("Unable to perform unlock action " + e);
} finally {
try {
statement.execute("COMMIT");
LOG.debug(Thread.currentThread().getName() + "> Completed the tx to set locked = false");
} catch (SQLException e) {
if (rethrow == null) {
throw new FlywaySqlException("Failed to commit unlock action", e);
}
LOG.warn("Failed to commit unlock action: " + e);
}
}
}

}
Loading