From f972d19e40e57c96c4b7cc06be986563594c7458 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 6 Jul 2025 12:16:53 +0200 Subject: [PATCH 1/2] Add SQLite based cache store [ci fast] Signed-off-by: Paolo Di Tommaso --- modules/nextflow/build.gradle | 1 + .../nextflow/cache/DefaultCacheFactory.groovy | 12 +- .../cache/sqlite/SQLiteCacheStore.groovy | 341 +++++++++++++++++ .../cache/sqlite/SQLiteCacheStoreTest.groovy | 345 ++++++++++++++++++ 4 files changed, 698 insertions(+), 1 deletion(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/cache/sqlite/SQLiteCacheStore.groovy create mode 100644 modules/nextflow/src/test/groovy/nextflow/cache/sqlite/SQLiteCacheStoreTest.groovy diff --git a/modules/nextflow/build.gradle b/modules/nextflow/build.gradle index a62e9ac0fb..e5ceb83b98 100644 --- a/modules/nextflow/build.gradle +++ b/modules/nextflow/build.gradle @@ -44,6 +44,7 @@ dependencies { api "com.beust:jcommander:1.35" api("com.esotericsoftware.kryo:kryo:2.24.0") { exclude group: 'com.esotericsoftware.minlog', module: 'minlog' } api('org.iq80.leveldb:leveldb:0.12') + api('org.xerial:sqlite-jdbc:3.46.1.3') api('org.eclipse.jgit:org.eclipse.jgit:7.1.1.202505221757-r') api ('javax.activation:activation:1.1.1') api ('javax.mail:mail:1.4.7') diff --git a/modules/nextflow/src/main/groovy/nextflow/cache/DefaultCacheFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/cache/DefaultCacheFactory.groovy index 8f1f2682d9..37fe5717b9 100644 --- a/modules/nextflow/src/main/groovy/nextflow/cache/DefaultCacheFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/cache/DefaultCacheFactory.groovy @@ -20,6 +20,9 @@ package nextflow.cache import java.nio.file.Path import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.SysEnv +import nextflow.cache.sqlite.SQLiteCacheStore import nextflow.exception.AbortOperationException import nextflow.plugin.Priority @@ -30,6 +33,7 @@ import nextflow.plugin.Priority * * @author Paolo Di Tommaso */ +@Slf4j @CompileStatic @Priority(0) class DefaultCacheFactory extends CacheFactory { @@ -38,7 +42,13 @@ class DefaultCacheFactory extends CacheFactory { protected CacheDB newInstance(UUID uniqueId, String runName, Path home) { if( !uniqueId ) throw new AbortOperationException("Missing cache `uuid`") if( !runName ) throw new AbortOperationException("Missing cache `runName`") - final store = new DefaultCacheStore(uniqueId, runName, home) + final provider = SysEnv.get('NXF_CACHE_PROVIDER') + final store = switch (provider) { + case 'sqlite' -> new SQLiteCacheStore(uniqueId, runName, home) + case null -> new DefaultCacheStore(uniqueId, runName, home) + default -> throw new IllegalArgumentException("Unknown cache provider: $provider") + } + log.debug "Use nextflow cache ${store}" return new CacheDB(store) } diff --git a/modules/nextflow/src/main/groovy/nextflow/cache/sqlite/SQLiteCacheStore.groovy b/modules/nextflow/src/main/groovy/nextflow/cache/sqlite/SQLiteCacheStore.groovy new file mode 100644 index 0000000000..5affcac26f --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/cache/sqlite/SQLiteCacheStore.groovy @@ -0,0 +1,341 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cache.sqlite + +import java.nio.file.Path +import java.sql.Connection +import java.sql.DriverManager +import java.sql.PreparedStatement +import java.sql.ResultSet +import java.sql.SQLException + +import com.google.common.hash.HashCode +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.Const +import nextflow.cache.CacheStore +import nextflow.exception.AbortOperationException +import nextflow.util.CacheHelper + +/** + * SQLite-based implementation of the Nextflow cache store + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +class SQLiteCacheStore implements CacheStore { + + /** The underlying SQLite connection */ + private Connection connection + + /** The session UUID */ + private UUID uniqueId + + /** The unique run name associated to this cache instance */ + private String runName + + /** The base folder against which the cache is located. Default: current working directory */ + private Path baseDir + + /** The actual path where DB file is located */ + private Path dataDir + + /** The path to the database file */ + private Path dbFile + + /** Database connection synchronization object */ + private final Object connectionLock = new Object() + + SQLiteCacheStore(UUID uniqueId, String runName, Path home=null) { + this.uniqueId = uniqueId + this.runName = runName + this.baseDir = home ?: Const.appCacheDir.toAbsolutePath() + this.dataDir = baseDir.resolve("cache/$uniqueId") + this.dbFile = dataDir.resolve("cache.db") + } + + private void openDb() { + // make sure the db path exists + dataDir.mkdirs() + + try { + // Initialize SQLite JDBC driver + Class.forName("org.sqlite.JDBC") + + // Open connection to SQLite database with better configuration + def url = "jdbc:sqlite:${dbFile.toString()}" + connection = DriverManager.getConnection(url) + + // Disable autocommit for better control + connection.setAutoCommit(true) + + // Configure SQLite for better performance and concurrency + def stmt = connection.createStatement() + stmt.execute("PRAGMA journal_mode=WAL;") + stmt.execute("PRAGMA synchronous=NORMAL;") + stmt.execute("PRAGMA cache_size=10000;") + stmt.execute("PRAGMA temp_store=MEMORY;") + stmt.execute("PRAGMA busy_timeout=30000;") // 30 seconds timeout + stmt.close() + + // Create tables if they don't exist + createTables() + + } catch (Exception e) { + String msg + if (e.message?.contains('database is locked') || e.message?.contains('SQLITE_BUSY')) { + msg = "Unable to acquire lock on session with ID $uniqueId" + msg += "\n\n" + msg += "Common reasons for this error are:" + msg += "\n - You are trying to resume the execution of an already running pipeline" + msg += "\n - A previous execution was abruptly interrupted, leaving the session open" + msg += '\n' + msg += '\nYou can check for running processes that might be holding the database lock.' + throw new SQLException(msg) + } else { + msg = "Can't open cache DB: $dbFile" + msg += '\n\n' + msg += "Nextflow needs to be executed in a shared file system that supports file locks.\n" + msg += "Alternatively, you can run it in a local directory and specify the shared work\n" + msg += "directory by using the `-w` command line option." + throw new SQLException(msg, e) + } + } + } + + private void createTables() { + connection.createStatement().execute(""" + CREATE TABLE IF NOT EXISTS cache_entries ( + key BLOB PRIMARY KEY, + value BLOB + ) + """) + + connection.createStatement().execute(""" + CREATE TABLE IF NOT EXISTS cache_index ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_name TEXT NOT NULL, + key BLOB NOT NULL, + cached BOOLEAN NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + connection.createStatement().execute(""" + CREATE INDEX IF NOT EXISTS idx_cache_index_run_name ON cache_index(run_name) + """) + } + + + @Override + SQLiteCacheStore open() { + openDb() + // Clear previous index entries for this run + connection.createStatement().execute("DELETE FROM cache_index WHERE run_name = '${runName}'") + return this + } + + @Override + synchronized SQLiteCacheStore openForRead() { + openDb() + // Check if index exists for this run + PreparedStatement stmt = null + ResultSet rs = null + try { + stmt = connection.prepareStatement("SELECT COUNT(*) FROM cache_index WHERE run_name = ?") + stmt.setString(1, runName) + rs = stmt.executeQuery() + rs.next() + if (rs.getInt(1) == 0) { + throw new AbortOperationException("Missing cache index for run: $runName") + } + return this + } finally { + try { rs?.close() } catch (SQLException e) { /* ignore */ } + try { stmt?.close() } catch (SQLException e) { /* ignore */ } + } + } + + @Override + void drop() { + // Close connection first if it exists + close() + // Delete the entire data directory + dataDir.deleteDir() + } + + @Override + synchronized void close() { + try { + connection?.close() + } catch (SQLException e) { + log.warn("Error closing SQLite cache store connection", e) + } + } + + @Override + synchronized void writeIndex(HashCode key, boolean cached) { + PreparedStatement stmt = null + try { + stmt = connection.prepareStatement("INSERT INTO cache_index (run_name, key, cached) VALUES (?, ?, ?)") + stmt.setString(1, runName) + stmt.setBytes(2, key.asBytes()) + stmt.setBoolean(3, cached) + stmt.executeUpdate() + } catch (SQLException e) { + log.error("Error writing cache index", e) + throw new RuntimeException("Failed to write cache index", e) + } finally { + try { stmt?.close() } catch (SQLException e) { /* ignore */ } + } + } + + @Override + synchronized void deleteIndex() { + PreparedStatement stmt = null + try { + stmt = connection.prepareStatement("DELETE FROM cache_index WHERE run_name = ?") + stmt.setString(1, runName) + stmt.executeUpdate() + } catch (SQLException e) { + log.error("Error deleting cache index", e) + throw new RuntimeException("Failed to delete cache index", e) + } finally { + try { stmt?.close() } catch (SQLException e) { /* ignore */ } + } + } + + @Override + Iterator iterateIndex() { + try { + // Create a fresh statement for this iteration to avoid conflicts + def stmt = connection.prepareStatement("SELECT key, cached FROM cache_index WHERE run_name = ? ORDER BY id") + stmt.setString(1, runName) + final ResultSet rs = stmt.executeQuery() + + return new Iterator() { + private Index nextItem = null + private boolean fetched = false + + @Override + boolean hasNext() { + if (!fetched) { + nextItem = fetch() + fetched = true + } + return nextItem != null + } + + @Override + Index next() { + if (!fetched) { + nextItem = fetch() + fetched = true + } + final result = nextItem + nextItem = fetch() + fetched = true + return result + } + + private Index fetch() { + try { + if (rs.next()) { + final key = HashCode.fromBytes(rs.getBytes("key")) + final cached = rs.getBoolean("cached") + return new Index(key, cached) + } else { + try { + rs.close() + stmt.close() + } catch (SQLException e) { + // ignore cleanup errors + } + return null + } + } catch (SQLException e) { + log.error("Error reading cache index", e) + try { + rs.close() + stmt.close() + } catch (SQLException cleanupE) { + // ignore cleanup errors + } + throw new RuntimeException("Failed to read cache index", e) + } + } + } + } catch (SQLException e) { + log.error("Error iterating cache index", e) + throw new RuntimeException("Failed to iterate cache index", e) + } + } + + @Override + synchronized byte[] getEntry(HashCode key) { + PreparedStatement stmt = null + ResultSet rs = null + try { + stmt = connection.prepareStatement("SELECT value FROM cache_entries WHERE key = ?") + stmt.setBytes(1, key.asBytes()) + rs = stmt.executeQuery() + if (rs.next()) { + return rs.getBytes("value") + } + return null + } catch (SQLException e) { + log.error("Error getting cache entry", e) + throw new RuntimeException("Failed to get cache entry", e) + } finally { + try { rs?.close() } catch (SQLException e) { /* ignore */ } + try { stmt?.close() } catch (SQLException e) { /* ignore */ } + } + } + + @Override + synchronized void putEntry(HashCode key, byte[] value) { + PreparedStatement stmt = null + try { + stmt = connection.prepareStatement("INSERT OR REPLACE INTO cache_entries (key, value) VALUES (?, ?)") + stmt.setBytes(1, key.asBytes()) + stmt.setBytes(2, value) + stmt.executeUpdate() + } catch (SQLException e) { + log.error("Error putting cache entry", e) + throw new RuntimeException("Failed to put cache entry", e) + } finally { + try { stmt?.close() } catch (SQLException e) { /* ignore */ } + } + } + + @Override + synchronized void deleteEntry(HashCode key) { + PreparedStatement stmt = null + try { + stmt = connection.prepareStatement("DELETE FROM cache_entries WHERE key = ?") + stmt.setBytes(1, key.asBytes()) + stmt.executeUpdate() + } catch (SQLException e) { + log.error("Error deleting cache entry", e) + throw new RuntimeException("Failed to delete cache entry", e) + } finally { + try { stmt?.close() } catch (SQLException e) { /* ignore */ } + } + } +} \ No newline at end of file diff --git a/modules/nextflow/src/test/groovy/nextflow/cache/sqlite/SQLiteCacheStoreTest.groovy b/modules/nextflow/src/test/groovy/nextflow/cache/sqlite/SQLiteCacheStoreTest.groovy new file mode 100644 index 0000000000..63dbd4ec65 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/cache/sqlite/SQLiteCacheStoreTest.groovy @@ -0,0 +1,345 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cache.sqlite + +import java.nio.file.Files + +import nextflow.cache.CacheStore +import nextflow.exception.AbortOperationException +import nextflow.util.CacheHelper +import spock.lang.Specification + +/** + * Unit tests for SQLiteCacheStore + * + * @author Paolo Di Tommaso + */ +class SQLiteCacheStoreTest extends Specification { + + def 'should get and put cache entries' () { + given: + def folder = Files.createTempDirectory('test') + def uuid = UUID.randomUUID() + def runName = 'test_1' + and: + def store = new SQLiteCacheStore(uuid, runName, folder) + store.open() + + and: + def key1 = CacheHelper.hasher('ONE').hash() + def key2 = CacheHelper.hasher('TWO').hash() + def value = "Hello world" + + when: + store.putEntry(key1, value.bytes) + then: + new String(store.getEntry(key1)) == value + and: + store.getEntry(key2) == null + + when: + store.deleteEntry(key1) + then: + store.getEntry(key1) == null + + cleanup: + store?.close() + folder?.deleteDir() + } + + def 'should write and iterate cache index' () { + given: + def folder = Files.createTempDirectory('test') + def uuid = UUID.randomUUID() + def runName = 'test_run' + and: + def store = new SQLiteCacheStore(uuid, runName, folder) + store.open() + + and: + def key1 = CacheHelper.hasher('FIRST').hash() + def key2 = CacheHelper.hasher('SECOND').hash() + def key3 = CacheHelper.hasher('THIRD').hash() + + when: + store.writeIndex(key1, true) + store.writeIndex(key2, false) + store.writeIndex(key3, true) + + and: + def indexes = [] + def iterator = store.iterateIndex() + while (iterator.hasNext()) { + indexes << iterator.next() + } + + then: + indexes.size() == 3 + indexes[0].key == key1 + indexes[0].cached == true + indexes[1].key == key2 + indexes[1].cached == false + indexes[2].key == key3 + indexes[2].cached == true + + cleanup: + store?.close() + folder?.deleteDir() + } + + def 'should delete cache index' () { + given: + def folder = Files.createTempDirectory('test') + def uuid = UUID.randomUUID() + def runName = 'test_run' + and: + def store = new SQLiteCacheStore(uuid, runName, folder) + store.open() + + and: + def key1 = CacheHelper.hasher('FIRST').hash() + def key2 = CacheHelper.hasher('SECOND').hash() + + when: + store.writeIndex(key1, true) + store.writeIndex(key2, false) + + then: + def indexes = [] + def iterator = store.iterateIndex() + while (iterator.hasNext()) { + indexes << iterator.next() + } + indexes.size() == 2 + + when: + store.deleteIndex() + + and: + def indexes2 = [] + def iterator2 = store.iterateIndex() + while (iterator2.hasNext()) { + indexes2 << iterator2.next() + } + + then: + indexes2.size() == 0 + + cleanup: + store?.close() + folder?.deleteDir() + } + + def 'should handle different run names separately' () { + given: + def folder = Files.createTempDirectory('test') + def uuid = UUID.randomUUID() + def runName1 = 'test_run_1' + def runName2 = 'test_run_2' + + and: + def store1 = new SQLiteCacheStore(uuid, runName1, folder) + store1.open() + def store2 = new SQLiteCacheStore(uuid, runName2, folder) + store2.open() + + and: + def key1 = CacheHelper.hasher('KEY1').hash() + def key2 = CacheHelper.hasher('KEY2').hash() + + when: + store1.writeIndex(key1, true) + store2.writeIndex(key2, false) + + then: + def indexes1 = [] + def iterator1 = store1.iterateIndex() + while (iterator1.hasNext()) { + indexes1 << iterator1.next() + } + indexes1.size() == 1 + indexes1[0].key == key1 + + and: + def indexes2 = [] + def iterator2 = store2.iterateIndex() + while (iterator2.hasNext()) { + indexes2 << iterator2.next() + } + indexes2.size() == 1 + indexes2[0].key == key2 + + cleanup: + store1?.close() + store2?.close() + folder?.deleteDir() + } + + def 'should open for read when index exists' () { + given: + def folder = Files.createTempDirectory('test') + def uuid = UUID.randomUUID() + def runName = 'test_run' + + and: + def store1 = new SQLiteCacheStore(uuid, runName, folder) + store1.open() + def key1 = CacheHelper.hasher('KEY1').hash() + store1.writeIndex(key1, true) + store1.close() + + when: + def store2 = new SQLiteCacheStore(uuid, runName, folder) + store2.openForRead() + + then: + def indexes = [] + def iterator = store2.iterateIndex() + while (iterator.hasNext()) { + indexes << iterator.next() + } + indexes.size() == 1 + indexes[0].key == key1 + + cleanup: + store2?.close() + folder?.deleteDir() + } + + def 'should throw exception when opening for read without index' () { + given: + def folder = Files.createTempDirectory('test') + def uuid = UUID.randomUUID() + def runName = 'test_run' + and: + def store = new SQLiteCacheStore(uuid, runName, folder) + + when: + store.openForRead() + + then: + thrown(AbortOperationException) + + cleanup: + store?.close() + folder?.deleteDir() + } + + def 'should drop cache completely' () { + given: + def folder = Files.createTempDirectory('test') + def uuid = UUID.randomUUID() + def runName = 'test_run' + and: + def store = new SQLiteCacheStore(uuid, runName, folder) + store.open() + + and: + def key1 = CacheHelper.hasher('KEY1').hash() + def value = "Test value" + store.putEntry(key1, value.bytes) + store.writeIndex(key1, true) + + when: + def dataDir = store.dataDir + def exists1 = dataDir.exists() + store.drop() + def exists2 = dataDir.exists() + + then: + exists1 == true + exists2 == false + + cleanup: + folder?.deleteDir() + } + + def 'should handle multiple entries and maintain consistency' () { + given: + def folder = Files.createTempDirectory('test') + def uuid = UUID.randomUUID() + def runName = 'test_run' + and: + def store = new SQLiteCacheStore(uuid, runName, folder) + store.open() + + and: + def entries = [:] + (1..100).each { i -> + def key = CacheHelper.hasher("KEY_$i").hash() + def value = "Value for entry $i" + entries[key] = value + store.putEntry(key, value.bytes) + store.writeIndex(key, i % 2 == 0) // alternate between cached/not cached + } + + when: + def retrievedEntries = [:] + entries.each { key, expectedValue -> + def actualValue = new String(store.getEntry(key)) + retrievedEntries[key] = actualValue + } + + and: + def indexes = [] + def iterator = store.iterateIndex() + while (iterator.hasNext()) { + indexes << iterator.next() + } + + then: + retrievedEntries == entries + indexes.size() == 100 + indexes.findAll { it.cached }.size() == 50 + indexes.findAll { !it.cached }.size() == 50 + + cleanup: + store?.close() + folder?.deleteDir() + } + + def 'should handle concurrent access' () { + given: + def folder = Files.createTempDirectory('test') + def uuid = UUID.randomUUID() + def runName = 'test_run' + and: + def store = new SQLiteCacheStore(uuid, runName, folder) + store.open() + + and: + def key = CacheHelper.hasher('CONCURRENT_KEY').hash() + def value = "Concurrent value" + + when: + // Simulate concurrent writes + 10.times { i -> + store.putEntry(key, "${value}_${i}".bytes) + } + + and: + def finalValue = new String(store.getEntry(key)) + + then: + finalValue.startsWith(value) + + cleanup: + store?.close() + folder?.deleteDir() + } +} \ No newline at end of file From eb6d2719fb8cefbae78b9c41ab5caf581d2aeafb Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 14 Jul 2025 12:49:33 +0200 Subject: [PATCH 2/2] Improve pragma handling [ci fast] Signed-off-by: Paolo Di Tommaso --- .../nextflow/cache/sqlite/SQLiteCacheStore.groovy | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/cache/sqlite/SQLiteCacheStore.groovy b/modules/nextflow/src/main/groovy/nextflow/cache/sqlite/SQLiteCacheStore.groovy index 5affcac26f..61a245572b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/cache/sqlite/SQLiteCacheStore.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/cache/sqlite/SQLiteCacheStore.groovy @@ -87,11 +87,13 @@ class SQLiteCacheStore implements CacheStore { // Configure SQLite for better performance and concurrency def stmt = connection.createStatement() - stmt.execute("PRAGMA journal_mode=WAL;") - stmt.execute("PRAGMA synchronous=NORMAL;") - stmt.execute("PRAGMA cache_size=10000;") - stmt.execute("PRAGMA temp_store=MEMORY;") - stmt.execute("PRAGMA busy_timeout=30000;") // 30 seconds timeout + stmt.execute(""" + PRAGMA journal_mode=WAL; + PRAGMA synchronous=NORMAL; + PRAGMA cache_size=10000; + PRAGMA temp_store=MEMORY; + PRAGMA busy_timeout=30000; + """) stmt.close() // Create tables if they don't exist