From 798c65b349764fbca377c8a7c1562b2ac08901d6 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sat, 15 Mar 2025 17:42:44 +0100 Subject: [PATCH 1/4] Seqera scheduler first commit [ci fast] Signed-off-by: Paolo Di Tommaso --- plugins/nf-seqera/build.gradle | 42 ++++ .../main/io/seqera/client/SeqeraClient.groovy | 180 ++++++++++++++++++ .../main/io/seqera/config/RetryOpts.groovy | 49 +++++ .../main/io/seqera/config/SeqeraConfig.groovy | 47 +++++ .../exception/BadResponseException.groovy | 29 +++ .../exception/UnauthorizedException.groovy | 29 +++ .../io/seqera/executor/SeqeraExecutor.groovy | 78 ++++++++ .../seqera/executor/SeqeraTaskHandler.groovy | 148 ++++++++++++++ .../main/io/seqera/plugin/SeqeraPlugin.groovy | 35 ++++ .../src/main/io/seqera/serde/Serde.groovy | 80 ++++++++ .../src/resources/META-INF/MANIFEST.MF | 6 + .../src/resources/META-INF/extensions.idx | 18 ++ .../io/seqera/config/RetryOptsTest.groovy | 45 +++++ settings.gradle | 3 + 14 files changed, 789 insertions(+) create mode 100644 plugins/nf-seqera/build.gradle create mode 100644 plugins/nf-seqera/src/main/io/seqera/client/SeqeraClient.groovy create mode 100644 plugins/nf-seqera/src/main/io/seqera/config/RetryOpts.groovy create mode 100644 plugins/nf-seqera/src/main/io/seqera/config/SeqeraConfig.groovy create mode 100644 plugins/nf-seqera/src/main/io/seqera/exception/BadResponseException.groovy create mode 100644 plugins/nf-seqera/src/main/io/seqera/exception/UnauthorizedException.groovy create mode 100644 plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy create mode 100644 plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy create mode 100644 plugins/nf-seqera/src/main/io/seqera/plugin/SeqeraPlugin.groovy create mode 100644 plugins/nf-seqera/src/main/io/seqera/serde/Serde.groovy create mode 100644 plugins/nf-seqera/src/resources/META-INF/MANIFEST.MF create mode 100644 plugins/nf-seqera/src/resources/META-INF/extensions.idx create mode 100644 plugins/nf-seqera/src/test/io/seqera/config/RetryOptsTest.groovy diff --git a/plugins/nf-seqera/build.gradle b/plugins/nf-seqera/build.gradle new file mode 100644 index 0000000000..75fceb9fba --- /dev/null +++ b/plugins/nf-seqera/build.gradle @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019, Seqera Labs. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This Source Code Form is "Incompatible With Secondary Licenses", as + * defined by the Mozilla Public License, v. 2.0. + */ + +apply plugin: 'java' +apply plugin: 'java-test-fixtures' +apply plugin: 'idea' +apply plugin: 'groovy' + +sourceSets { + main.java.srcDirs = [] + main.groovy.srcDirs = ['src/main'] + main.resources.srcDirs = ['src/resources'] + test.groovy.srcDirs = ['src/test'] + test.java.srcDirs = [] + test.resources.srcDirs = [] +} + +configurations { + // see https://docs.gradle.org/4.1/userguide/dependency_management.html#sub:exclude_transitive_dependencies + runtimeClasspath.exclude group: 'org.slf4j', module: 'slf4j-api' +} + +dependencies { + compileOnly project(':nextflow') + compileOnly 'org.slf4j:slf4j-api:2.0.16' + compileOnly 'org.pf4j:pf4j:3.12.0' + api 'io.seqera:sched-api:0.1.0' + implementation 'com.squareup.moshi:moshi:1.15.2' + implementation 'com.squareup.moshi:moshi-adapters:1.15.2' + + testImplementation(testFixtures(project(":nextflow"))) + testImplementation "org.apache.groovy:groovy:4.0.26" + testImplementation "org.apache.groovy:groovy-nio:4.0.26" +} diff --git a/plugins/nf-seqera/src/main/io/seqera/client/SeqeraClient.groovy b/plugins/nf-seqera/src/main/io/seqera/client/SeqeraClient.groovy new file mode 100644 index 0000000000..a8402f2cb8 --- /dev/null +++ b/plugins/nf-seqera/src/main/io/seqera/client/SeqeraClient.groovy @@ -0,0 +1,180 @@ +/* + * Copyright 2013-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.client + +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse +import java.time.Duration +import java.time.temporal.ChronoUnit +import java.util.concurrent.Executors +import java.util.function.Predicate + +import dev.failsafe.Failsafe +import dev.failsafe.RetryPolicy +import dev.failsafe.event.EventListener +import dev.failsafe.event.ExecutionAttemptedEvent +import dev.failsafe.function.CheckedSupplier +import groovy.json.JsonOutput +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import io.seqera.config.SeqeraConfig +import io.seqera.exception.BadResponseException +import io.seqera.sched.api.v1a1.CancelJobResponse +import io.seqera.sched.api.v1a1.CreateJobRequest +import io.seqera.sched.api.v1a1.CreateJobResponse +import io.seqera.sched.api.v1a1.DescribeJobResponse +import io.seqera.sched.api.v1a1.GetJobLogsResponse +import io.seqera.serde.Serde +import io.seqera.util.trace.TraceUtils +import nextflow.Session +import nextflow.util.Threads +/** + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +class SeqeraClient { + + private HttpClient httpClient + + private CookieManager cookieManager + + private SeqeraConfig config + + SeqeraClient(Session session) { + this.config = new SeqeraConfig(session.config.seqera as Map ?: Collections.emptyMap()) + // the cookie manager + cookieManager = new CookieManager() + // create http client + this.httpClient = newHttpClient() + } + + protected HttpClient newHttpClient() { + final builder = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .followRedirects(HttpClient.Redirect.NORMAL) + .connectTimeout(Duration.ofSeconds(10)) + .cookieHandler(cookieManager) + // use virtual threads executor if enabled + if( Threads.useVirtual() ) + builder.executor(Executors.newVirtualThreadPerTaskExecutor()) + // build and return the new client + return builder.build() + } + + protected R get0(String uri, Class responseType) { + final trace = TraceUtils.rndTrace() + final req = HttpRequest.newBuilder() + .uri(URI.create(uri)) + .headers('Content-Type','application/json', 'Traceparent', trace) + .GET() + .build() + send0(req, responseType) + } + + protected R delete0(String uri, Class response) { + final trace = TraceUtils.rndTrace() + final req = HttpRequest.newBuilder() + .uri(URI.create(uri)) + .headers('Content-Type','application/json', 'Traceparent', trace) + .DELETE() + .build() + return send0(req, response) + } + + protected R post0(String uri, T request, Class response) { + final trace = TraceUtils.rndTrace() + final body = JsonOutput.toJson(request) + final req = HttpRequest.newBuilder() + .uri(URI.create(uri)) + .headers('Content-Type','application/json', 'Traceparent', trace) + .POST(HttpRequest.BodyPublishers.ofString(body)) + .build() + return send0(req, response) + } + + protected T send0(HttpRequest req, Class type) { + final HttpResponse resp = httpSend(req); + log.debug("Seqera status response: statusCode={}; body={}", resp.statusCode(), resp.body()) + if( resp.statusCode()==200 ) { + return Serde.fromJson(resp.body(), type) + } + else { + final msg = "Seqera invalid response: ${req.method()} ${req.uri()} [${resp.statusCode()}] ${resp.body()}" + throw new BadResponseException(msg) + } + } + + CreateJobResponse createJob(CreateJobRequest request) { + final uri = "${config.endpoint}/v1a1/jobs" + return post0(uri, request, CreateJobResponse) + } + + DescribeJobResponse describeJob(String jobId) { + final uri = "${config.endpoint}/v1a1/jobs/${jobId}" + return get0(uri, DescribeJobResponse) + } + + CancelJobResponse cancelJob(String jobId) { + final uri = "${config.endpoint}/v1a1/jobs/${jobId}" + return delete0(uri, CancelJobResponse) + } + + GetJobLogsResponse getJobLogs(String jobId) { + final uri = "${config.endpoint}/v1a1/jobs/${jobId}/logs" + return get0(uri, GetJobLogsResponse) + } + + protected RetryPolicy retryPolicy(Predicate cond, Predicate handle) { + final cfg = config.retryOpts() + final listener = new EventListener>() { + @Override + void accept(ExecutionAttemptedEvent event) throws Throwable { + def msg = "Seqera connection failure - attempt: ${event.attemptCount}" + if( event.lastResult!=null ) + msg += "; response: ${event.lastResult}" + if( event.lastFailure != null ) + msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}" + log.debug(msg) + } + } + return RetryPolicy.builder() + .handleIf(cond) + .handleResultIf(handle) + .withBackoff(cfg.delay.toMillis(), cfg.maxDelay.toMillis(), ChronoUnit.MILLIS) + .withMaxAttempts(cfg.maxAttempts) + .withJitter(cfg.jitter) + .onRetry(listener) + .build() + } + + protected HttpResponse safeApply(CheckedSupplier action) { + final retryOnException = (e -> e instanceof IOException) as Predicate + final retryOnStatusCode = ((HttpResponse resp) -> resp.statusCode() in SERVER_ERRORS) as Predicate> + final policy = retryPolicy(retryOnException, retryOnStatusCode) + return Failsafe.with(policy).get(action) + } + + static private final List SERVER_ERRORS = [429,500,502,503,504] + + protected HttpResponse httpSend(HttpRequest req) { + return safeApply(() -> httpClient.send(req, HttpResponse.BodyHandlers.ofString())) + } +} diff --git a/plugins/nf-seqera/src/main/io/seqera/config/RetryOpts.groovy b/plugins/nf-seqera/src/main/io/seqera/config/RetryOpts.groovy new file mode 100644 index 0000000000..931fce6454 --- /dev/null +++ b/plugins/nf-seqera/src/main/io/seqera/config/RetryOpts.groovy @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.config + +import groovy.transform.CompileStatic +import groovy.transform.ToString +import nextflow.util.Duration + +/** + * Model retry options for Wave http requests + */ +@ToString(includeNames = true, includePackage = false) +@CompileStatic +class RetryOpts { + Duration delay = Duration.of('450ms') + Duration maxDelay = Duration.of('90s') + int maxAttempts = 10 + double jitter = 0.25 + + RetryOpts() { + this(Collections.emptyMap()) + } + + RetryOpts(Map config) { + if( config.delay ) + delay = config.delay as Duration + if( config.maxDelay ) + maxDelay = config.maxDelay as Duration + if( config.maxAttempts ) + maxAttempts = config.maxAttempts as int + if( config.jitter ) + jitter = config.jitter as double + } +} diff --git a/plugins/nf-seqera/src/main/io/seqera/config/SeqeraConfig.groovy b/plugins/nf-seqera/src/main/io/seqera/config/SeqeraConfig.groovy new file mode 100644 index 0000000000..0a14ca3626 --- /dev/null +++ b/plugins/nf-seqera/src/main/io/seqera/config/SeqeraConfig.groovy @@ -0,0 +1,47 @@ +/* + * Copyright 2013-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.config + +import groovy.transform.CompileStatic + +/** + * + * @author Paolo Di Tommaso + */ +@CompileStatic +class SeqeraConfig { + + private RetryOpts retryOpts + + private String endpoint + + SeqeraConfig(Map opts) { + this.retryOpts = new RetryOpts(opts.retryPolicy as Map ?: Map.of()) + this.endpoint = opts.endpoint as String + if( !endpoint ) + throw new IllegalArgumentException("Missing Seqera endpoint - make sure to specify 'seqera.endpoint' settings") + } + + RetryOpts retryOpts() { + this.retryOpts + } + + String getEndpoint() { + return endpoint + } +} diff --git a/plugins/nf-seqera/src/main/io/seqera/exception/BadResponseException.groovy b/plugins/nf-seqera/src/main/io/seqera/exception/BadResponseException.groovy new file mode 100644 index 0000000000..fcca3ad172 --- /dev/null +++ b/plugins/nf-seqera/src/main/io/seqera/exception/BadResponseException.groovy @@ -0,0 +1,29 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.exception + +import groovy.transform.InheritConstructors + +/** + * Model an invalid HTTP response + * + * @author Paolo Di Tommaso + */ +@InheritConstructors +class BadResponseException extends RuntimeException { +} diff --git a/plugins/nf-seqera/src/main/io/seqera/exception/UnauthorizedException.groovy b/plugins/nf-seqera/src/main/io/seqera/exception/UnauthorizedException.groovy new file mode 100644 index 0000000000..fa68269949 --- /dev/null +++ b/plugins/nf-seqera/src/main/io/seqera/exception/UnauthorizedException.groovy @@ -0,0 +1,29 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.exception + +import groovy.transform.InheritConstructors + +/** + * Unauthorized access exception + * + * @author Paolo Di Tommaso + */ +@InheritConstructors +class UnauthorizedException extends RuntimeException { +} diff --git a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy new file mode 100644 index 0000000000..f59dd0142f --- /dev/null +++ b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy @@ -0,0 +1,78 @@ +/* + * Copyright 2013-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.executor + +import io.seqera.client.SeqeraClient +import nextflow.exception.AbortOperationException +import nextflow.executor.Executor +import nextflow.fusion.FusionHelper +import nextflow.processor.TaskHandler +import nextflow.processor.TaskMonitor +import nextflow.processor.TaskPollingMonitor +import nextflow.processor.TaskRun +import nextflow.util.Duration +import nextflow.util.ServiceName +import org.pf4j.ExtensionPoint + +/** + * + * @author Paolo Di Tommaso + */ +@ServiceName("seqera") +class SeqeraExecutor extends Executor implements ExtensionPoint { + + private SeqeraClient client + + @Override + protected void register() { + createClient() + } + + protected void createClient() { + this.client = new SeqeraClient(session) + } + + @Override + protected TaskMonitor createTaskMonitor() { + TaskPollingMonitor.create(session, name, 1000, Duration.of('10 sec')) + } + + @Override + TaskHandler createTaskHandler(TaskRun task) { + return new SeqeraTaskHandler(task, this) + } + + /** + * @return {@code true} whenever the containerization is managed by the executor itself + */ + boolean isContainerNative() { + return true + } + + @Override + boolean isFusionEnabled() { + final enabled = FusionHelper.isFusionEnabled(session) + if( !enabled ) + throw new AbortOperationException("Seqera executor requires the use of Fusion file system") + return true + } + + SeqeraClient getClient() { + return client + } +} diff --git a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy new file mode 100644 index 0000000000..7d80c35647 --- /dev/null +++ b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy @@ -0,0 +1,148 @@ +/* + * Copyright 2013-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.executor + +import java.nio.file.Path + +import groovy.transform.CompileStatic +import groovy.transform.PackageScope +import groovy.util.logging.Slf4j +import io.seqera.client.SeqeraClient +import io.seqera.sched.api.v1a1.CreateJobRequest +import io.seqera.sched.api.v1a1.GetJobLogsResponse +import io.seqera.sched.api.v1a1.JobState +import nextflow.fusion.FusionAwareTask +import nextflow.processor.TaskHandler +import nextflow.processor.TaskRun +import nextflow.processor.TaskStatus +/** + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask { + + private SeqeraClient client + + private SeqeraExecutor executor + + private Path exitFile + + private Path outputFile + + private Path errorFile + + private String jobId + + SeqeraTaskHandler(TaskRun task, SeqeraExecutor executor) { + super(task) + this.client = executor.getClient() + this.executor = executor + // those files are access via NF runtime, keep based on CloudStoragePath + this.outputFile = task.workDir.resolve(TaskRun.CMD_OUTFILE) + this.errorFile = task.workDir.resolve(TaskRun.CMD_ERRFILE) + this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT) + } + + @Override + void prepareLauncher() { + assert fusionEnabled() + final launcher = fusionLauncher() + launcher.build() + } + + @Override + void submit() { + final req = new CreateJobRequest() + .withCommand(fusionSubmitCli()) + .withImage(task.getContainer()) + .withEnvironment(fusionLauncher().fusionEnv()) + .withPlatform(task.getContainerPlatform()) + log.debug "[SEQERA] Submitting job request=${req}" + final resp = client.createJob(req) + this.jobId = resp.jobId + this.status = TaskStatus.SUBMITTED + } + + protected JobState jobState() { + return client + .describeJob(jobId) + .jobsState + } + + @Override + boolean checkIfRunning() { + if(isSubmitted()) { + final state = jobState() + log.debug "[SEQERA] checkIfRunning job=${jobId}; state=${state}" + if( state.isRunningOrTerminated() ) { + status = TaskStatus.RUNNING + return true + } + } + return false + } + + @Override + boolean checkIfCompleted() { + final state = jobState() + log.debug "[SEQERA] checkIfCompleted state=${state}" + if( state.isTerminated() ) { + log.debug "[SEQERA] Process `${task.lazyName()}` - terminated job=$jobId; state=$state" + // finalize the task + task.exitStatus = readExitFile() + if( state.isFailed() ) { + final logs = getJobLogs(jobId) + task.stdout = logs?.stdout ?: outputFile + task.stderr = logs?.stderr ?: errorFile + } + else { + task.stdout = outputFile + task.stderr = errorFile + } + status = TaskStatus.COMPLETED + return true + } + + return false + } + + protected GetJobLogsResponse getJobLogs(String jobId) { + return client.getJobLogs(jobId) + } + + @Override + protected void killTask() { + log.debug "[SEQERA] Kill job=${jobId}" + client.cancelJob(jobId) + } + + @PackageScope Integer readExitFile() { + try { + final result = exitFile.text as Integer + log.trace "[SEQERA] Read exit file for job $jobId; exit=${result}" + return result + } + catch (Exception e) { + log.debug "[SEQERA] Cannot read exit status for task: `${task.lazyName()}` - ${e.message}" + // return MAX_VALUE to signal it was unable to retrieve the exit code + return Integer.MAX_VALUE + } + } +} diff --git a/plugins/nf-seqera/src/main/io/seqera/plugin/SeqeraPlugin.groovy b/plugins/nf-seqera/src/main/io/seqera/plugin/SeqeraPlugin.groovy new file mode 100644 index 0000000000..335bcc1f4a --- /dev/null +++ b/plugins/nf-seqera/src/main/io/seqera/plugin/SeqeraPlugin.groovy @@ -0,0 +1,35 @@ +/* + * Copyright 2013-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.plugin + +import groovy.transform.CompileStatic +import nextflow.plugin.BasePlugin +import org.pf4j.PluginWrapper + +/** + * Seqera plugin entry point + * + * @author Paolo Di Tommaso + */ +@CompileStatic +class SeqeraPlugin extends BasePlugin { + + SeqeraPlugin(PluginWrapper wrapper) { + super(wrapper) + } +} diff --git a/plugins/nf-seqera/src/main/io/seqera/serde/Serde.groovy b/plugins/nf-seqera/src/main/io/seqera/serde/Serde.groovy new file mode 100644 index 0000000000..f3d092de8a --- /dev/null +++ b/plugins/nf-seqera/src/main/io/seqera/serde/Serde.groovy @@ -0,0 +1,80 @@ +/* + * Copyright 2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.serde + + +import java.time.Instant + +import com.squareup.moshi.Moshi +/** + * Implement model serialization-deserialization helpers + * + * @author Paolo Di Tommaso + */ +class Serde { + + static Boolean asBoolean(Object value) { + if( value==null ) + return null; + if( value instanceof Boolean ) + return (Boolean) value; + return Boolean.valueOf(value.toString()); + } + + static Integer asInteger(Object value) { + if( value==null ) + return null; + if( value instanceof Number ) + return ((Number) value).intValue(); + if( value instanceof CharSequence ) + return Integer.parseInt(value.toString()); + throw new IllegalArgumentException("Illegal Integer value: " + value); + } + + static Long asLong(Object value) { + if( value==null ) + return null; + if( value instanceof Number ) + return ((Number) value).longValue(); + if( value instanceof CharSequence ) + return Long.parseLong(value.toString()); + throw new IllegalArgumentException("Illegal Long value: " + value); + } + + static Instant asInstant(Object value) { + if( value==null ) + return null; + if( value instanceof Instant ) + return (Instant) value; + if( value instanceof CharSequence ) + return Instant.parse(value.toString()); + throw new IllegalArgumentException("Illegal Instant value: " + value); + } + + + static T fromJson(String json, Class type) { + Moshi moshi = new Moshi.Builder().build(); + try { + return moshi.adapter(type).fromJson(json); + } catch (IOException e) { + final String msg = String.format("Unable to parse JSON to %s - offending value: %s", type, json); + throw new RuntimeException(msg, e); + } + } + +} diff --git a/plugins/nf-seqera/src/resources/META-INF/MANIFEST.MF b/plugins/nf-seqera/src/resources/META-INF/MANIFEST.MF new file mode 100644 index 0000000000..36759290d8 --- /dev/null +++ b/plugins/nf-seqera/src/resources/META-INF/MANIFEST.MF @@ -0,0 +1,6 @@ +Manifest-Version: 1.0 +Plugin-Class: io.seqera.plugin.SeqeraPlugin +Plugin-Id: nf-seqera +Plugin-Version: 0.1.0 +Plugin-Provider: Seqera Labs +Plugin-Requires: >=25.01.0-edge diff --git a/plugins/nf-seqera/src/resources/META-INF/extensions.idx b/plugins/nf-seqera/src/resources/META-INF/extensions.idx new file mode 100644 index 0000000000..4792b1a886 --- /dev/null +++ b/plugins/nf-seqera/src/resources/META-INF/extensions.idx @@ -0,0 +1,18 @@ +# +# Copyright 2013-2024, Seqera Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +io.seqera.executor.SeqeraExecutor + diff --git a/plugins/nf-seqera/src/test/io/seqera/config/RetryOptsTest.groovy b/plugins/nf-seqera/src/test/io/seqera/config/RetryOptsTest.groovy new file mode 100644 index 0000000000..57c3f3e72a --- /dev/null +++ b/plugins/nf-seqera/src/test/io/seqera/config/RetryOptsTest.groovy @@ -0,0 +1,45 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.config + +import nextflow.util.Duration +import spock.lang.Specification + +/** + * + * @author Paolo Di Tommaso + */ +class RetryOptsTest extends Specification { + + def 'should create retry config' () { + + expect: + new RetryOpts().delay == Duration.of('450ms') + new RetryOpts().maxDelay == Duration.of('90s') + new RetryOpts().maxAttempts == 10 + new RetryOpts().jitter == 0.25d + + and: + new RetryOpts([maxAttempts: 20]).maxAttempts == 20 + new RetryOpts([delay: '1s']).delay == Duration.of('1s') + new RetryOpts([maxDelay: '1m']).maxDelay == Duration.of('1m') + new RetryOpts([jitter: '0.5']).jitter == 0.5d + + } + +} diff --git a/settings.gradle b/settings.gradle index 53d56ba13b..9951715b92 100644 --- a/settings.gradle +++ b/settings.gradle @@ -43,3 +43,6 @@ include 'plugins:nf-codecommit' include 'plugins:nf-wave' include 'plugins:nf-cloudcache' include 'plugins:nf-k8s' +include 'plugins:nf-seqera' + +includeBuild '../libseqera' From 58a68c03ebef9958feb3bf3c56786c8e66b315f1 Mon Sep 17 00:00:00 2001 From: Lorenzo Fontana Date: Thu, 3 Jul 2025 18:17:05 +0200 Subject: [PATCH 2/4] Seqera scheduler create cluster [ci fast] --- .../main/io/seqera/client/SeqeraClient.groovy | 57 ++++++++++++++----- .../main/io/seqera/config/SeqeraConfig.groovy | 20 ++++++- .../io/seqera/executor/SeqeraExecutor.groovy | 18 +++++- .../seqera/executor/SeqeraTaskHandler.groovy | 1 + 4 files changed, 79 insertions(+), 17 deletions(-) diff --git a/plugins/nf-seqera/src/main/io/seqera/client/SeqeraClient.groovy b/plugins/nf-seqera/src/main/io/seqera/client/SeqeraClient.groovy index a8402f2cb8..d8cf277cbd 100644 --- a/plugins/nf-seqera/src/main/io/seqera/client/SeqeraClient.groovy +++ b/plugins/nf-seqera/src/main/io/seqera/client/SeqeraClient.groovy @@ -17,14 +17,6 @@ package io.seqera.client -import java.net.http.HttpClient -import java.net.http.HttpRequest -import java.net.http.HttpResponse -import java.time.Duration -import java.time.temporal.ChronoUnit -import java.util.concurrent.Executors -import java.util.function.Predicate - import dev.failsafe.Failsafe import dev.failsafe.RetryPolicy import dev.failsafe.event.EventListener @@ -35,15 +27,20 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.seqera.config.SeqeraConfig import io.seqera.exception.BadResponseException -import io.seqera.sched.api.v1a1.CancelJobResponse -import io.seqera.sched.api.v1a1.CreateJobRequest -import io.seqera.sched.api.v1a1.CreateJobResponse -import io.seqera.sched.api.v1a1.DescribeJobResponse -import io.seqera.sched.api.v1a1.GetJobLogsResponse +import io.seqera.sched.api.v1a1.* import io.seqera.serde.Serde import io.seqera.util.trace.TraceUtils import nextflow.Session import nextflow.util.Threads + +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse +import java.time.Duration +import java.time.temporal.ChronoUnit +import java.util.concurrent.Executors +import java.util.function.Predicate + /** * * @author Paolo Di Tommaso @@ -99,6 +96,17 @@ class SeqeraClient { return send0(req, response) } + protected R delete0WithBody(String uri, T request, Class response) { + final trace = TraceUtils.rndTrace() + final body = JsonOutput.toJson(request) + final req = HttpRequest.newBuilder() + .uri(URI.create(uri)) + .headers('Content-Type','application/json', 'Traceparent', trace) + .method("DELETE", HttpRequest.BodyPublishers.ofString(body)) + .build() + return send0(req, response) + } + protected R post0(String uri, T request, Class response) { final trace = TraceUtils.rndTrace() final body = JsonOutput.toJson(request) @@ -142,6 +150,29 @@ class SeqeraClient { return get0(uri, GetJobLogsResponse) } + CreateClusterResponse createCluster() { + final uri = "${config.endpoint}/v1a1/cluster" + final request = [region: this.config.getRegion(), keyName: this.config.getKeyPairName()] + return post0(uri, request, CreateClusterResponse) + } + + Map getClusterInfo(String clusterId) { + final uri = "${config.endpoint}/v1a1/cluster/${clusterId}" + return get0(uri, Map) + } + + Map deleteCluster(String clusterId) { + final uri = "${config.endpoint}/v1a1/cluster" + final request = [clusterId: clusterId, region: this.config.getRegion()] + return delete0WithBody(uri, request, Map) + } + + Map addNodeToCluster(String clusterId, String instanceType) { + final uri = "${config.endpoint}/v1a1/cluster/${clusterId}/add" + final request = [clusterId: clusterId, instanceType: instanceType] + return post0(uri, request, Map) + } + protected RetryPolicy retryPolicy(Predicate cond, Predicate handle) { final cfg = config.retryOpts() final listener = new EventListener>() { diff --git a/plugins/nf-seqera/src/main/io/seqera/config/SeqeraConfig.groovy b/plugins/nf-seqera/src/main/io/seqera/config/SeqeraConfig.groovy index 0a14ca3626..c84ca4a1cb 100644 --- a/plugins/nf-seqera/src/main/io/seqera/config/SeqeraConfig.groovy +++ b/plugins/nf-seqera/src/main/io/seqera/config/SeqeraConfig.groovy @@ -30,11 +30,21 @@ class SeqeraConfig { private String endpoint + private String region + + private String keyPairName + SeqeraConfig(Map opts) { this.retryOpts = new RetryOpts(opts.retryPolicy as Map ?: Map.of()) this.endpoint = opts.endpoint as String - if( !endpoint ) + if (!endpoint) throw new IllegalArgumentException("Missing Seqera endpoint - make sure to specify 'seqera.endpoint' settings") + + this.region = opts.region as String + if (!region) + region = "eu-central-1" + + this.keyPairName = opts.keyPairName as String } RetryOpts retryOpts() { @@ -44,4 +54,12 @@ class SeqeraConfig { String getEndpoint() { return endpoint } + + String getRegion() { + return region + } + + String getKeyPairName() { + return keyPairName + } } diff --git a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy index f59dd0142f..01863ff845 100644 --- a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy +++ b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy @@ -17,6 +17,7 @@ package io.seqera.executor +import groovy.util.logging.Slf4j import io.seqera.client.SeqeraClient import nextflow.exception.AbortOperationException import nextflow.executor.Executor @@ -33,11 +34,14 @@ import org.pf4j.ExtensionPoint * * @author Paolo Di Tommaso */ -@ServiceName("seqera") +@Slf4j +@ServiceName('seqera') class SeqeraExecutor extends Executor implements ExtensionPoint { private SeqeraClient client + private String clusterId + @Override protected void register() { createClient() @@ -45,6 +49,10 @@ class SeqeraExecutor extends Executor implements ExtensionPoint { protected void createClient() { this.client = new SeqeraClient(session) + log.debug "[SEQERA] Creating cluster for workflow" + final cluster = client.createCluster() + this.clusterId = cluster.clusterId + log.debug "[SEQERA] Cluster created id: " + cluster.clusterId } @Override @@ -54,7 +62,7 @@ class SeqeraExecutor extends Executor implements ExtensionPoint { @Override TaskHandler createTaskHandler(TaskRun task) { - return new SeqeraTaskHandler(task, this) + return new SeqeraTaskHandler(task, this) } /** @@ -67,7 +75,7 @@ class SeqeraExecutor extends Executor implements ExtensionPoint { @Override boolean isFusionEnabled() { final enabled = FusionHelper.isFusionEnabled(session) - if( !enabled ) + if (!enabled) throw new AbortOperationException("Seqera executor requires the use of Fusion file system") return true } @@ -75,4 +83,8 @@ class SeqeraExecutor extends Executor implements ExtensionPoint { SeqeraClient getClient() { return client } + + String getClusterId() { + return clusterId + } } diff --git a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy index 7d80c35647..5030fa9562 100644 --- a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy +++ b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy @@ -72,6 +72,7 @@ class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask { final req = new CreateJobRequest() .withCommand(fusionSubmitCli()) .withImage(task.getContainer()) + .withClusterId(executor.getClusterId()) .withEnvironment(fusionLauncher().fusionEnv()) .withPlatform(task.getContainerPlatform()) log.debug "[SEQERA] Submitting job request=${req}" From 8b6863e9158b8f666911ca5c2709b9ceb4dd528c Mon Sep 17 00:00:00 2001 From: Lorenzo Fontana Date: Fri, 4 Jul 2025 11:40:35 +0200 Subject: [PATCH 3/4] new(plugins/nf-seqera): cpu and memory --- .../io/seqera/executor/SeqeraExecutor.groovy | 19 +++++++++++++++-- .../seqera/executor/SeqeraTaskHandler.groovy | 21 +++++++++++-------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy index 01863ff845..cf7f2914d5 100644 --- a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy +++ b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy @@ -45,16 +45,31 @@ class SeqeraExecutor extends Executor implements ExtensionPoint { @Override protected void register() { createClient() + createCluster() } - protected void createClient() { - this.client = new SeqeraClient(session) + @Override + public void shutdown() { + deleteCluster() + } + + protected void createCluster() { log.debug "[SEQERA] Creating cluster for workflow" final cluster = client.createCluster() this.clusterId = cluster.clusterId log.debug "[SEQERA] Cluster created id: " + cluster.clusterId } + protected void deleteCluster() { + log.debug "[SEQERA] Deleting cluster: " + clusterId + client.deleteCluster(this.clusterId) + log.debug "[SEQERA] Cluster id deleted" + } + + protected void createClient() { + this.client = new SeqeraClient(session) + } + @Override protected TaskMonitor createTaskMonitor() { TaskPollingMonitor.create(session, name, 1000, Duration.of('10 sec')) diff --git a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy index 5030fa9562..0eab996976 100644 --- a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy +++ b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy @@ -17,8 +17,6 @@ package io.seqera.executor -import java.nio.file.Path - import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.util.logging.Slf4j @@ -30,6 +28,9 @@ import nextflow.fusion.FusionAwareTask import nextflow.processor.TaskHandler import nextflow.processor.TaskRun import nextflow.processor.TaskStatus + +import java.nio.file.Path + /** * * @author Paolo Di Tommaso @@ -75,6 +76,8 @@ class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask { .withClusterId(executor.getClusterId()) .withEnvironment(fusionLauncher().fusionEnv()) .withPlatform(task.getContainerPlatform()) + .withCpus(task.config.getCpus()) + .withMemory(task.config.getMemory().toBytes()) log.debug "[SEQERA] Submitting job request=${req}" final resp = client.createJob(req) this.jobId = resp.jobId @@ -89,10 +92,10 @@ class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask { @Override boolean checkIfRunning() { - if(isSubmitted()) { + if (isSubmitted()) { final state = jobState() log.debug "[SEQERA] checkIfRunning job=${jobId}; state=${state}" - if( state.isRunningOrTerminated() ) { + if (state.isRunningOrTerminated()) { status = TaskStatus.RUNNING return true } @@ -104,16 +107,15 @@ class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask { boolean checkIfCompleted() { final state = jobState() log.debug "[SEQERA] checkIfCompleted state=${state}" - if( state.isTerminated() ) { + if (state.isTerminated()) { log.debug "[SEQERA] Process `${task.lazyName()}` - terminated job=$jobId; state=$state" // finalize the task task.exitStatus = readExitFile() - if( state.isFailed() ) { + if (state.isFailed()) { final logs = getJobLogs(jobId) task.stdout = logs?.stdout ?: outputFile task.stderr = logs?.stderr ?: errorFile - } - else { + } else { task.stdout = outputFile task.stderr = errorFile } @@ -134,7 +136,8 @@ class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask { client.cancelJob(jobId) } - @PackageScope Integer readExitFile() { + @PackageScope + Integer readExitFile() { try { final result = exitFile.text as Integer log.trace "[SEQERA] Read exit file for job $jobId; exit=${result}" From e2d725289ce16a87b757f9f00e2bd4dfdb9cd611 Mon Sep 17 00:00:00 2001 From: Lorenzo Fontana Date: Fri, 4 Jul 2025 12:16:05 +0200 Subject: [PATCH 4/4] chore(plugins/nf-seqera): move cluster logic to own class --- .../executor/SeqeraClusterHandler.groovy | 56 +++++++++++++++++++ .../io/seqera/executor/SeqeraExecutor.groovy | 21 ++----- .../seqera/executor/SeqeraTaskHandler.groovy | 6 +- 3 files changed, 65 insertions(+), 18 deletions(-) create mode 100644 plugins/nf-seqera/src/main/io/seqera/executor/SeqeraClusterHandler.groovy diff --git a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraClusterHandler.groovy b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraClusterHandler.groovy new file mode 100644 index 0000000000..e0c57731c4 --- /dev/null +++ b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraClusterHandler.groovy @@ -0,0 +1,56 @@ +/* + * Copyright 2013-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.seqera.executor + +import groovy.util.logging.Slf4j +import io.seqera.client.SeqeraClient + +/** + * Handles cluster management operations for Seqera executor + */ +@Slf4j +class SeqeraClusterHandler { + + private SeqeraClient client + private String clusterId + + SeqeraClusterHandler(SeqeraClient client) { + this.client = client + } + + void createCluster() { + log.debug "[SEQERA] Creating cluster for workflow" + final cluster = client.createCluster() + this.clusterId = cluster.clusterId + log.debug "[SEQERA] Cluster created id: " + cluster.clusterId + } + + void deleteCluster() { + if (!clusterId) { + return + } + log.debug "[SEQERA] Deleting cluster: " + clusterId + client.deleteCluster(this.clusterId) + log.debug "[SEQERA] Cluster id deleted" + + } + + String getClusterId() { + return clusterId + } +} \ No newline at end of file diff --git a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy index cf7f2914d5..2696b76854 100644 --- a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy +++ b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy @@ -40,34 +40,23 @@ class SeqeraExecutor extends Executor implements ExtensionPoint { private SeqeraClient client - private String clusterId + private SeqeraClusterHandler clusterHandler @Override protected void register() { createClient() - createCluster() + clusterHandler.createCluster() } @Override public void shutdown() { - deleteCluster() + clusterHandler.deleteCluster() } - protected void createCluster() { - log.debug "[SEQERA] Creating cluster for workflow" - final cluster = client.createCluster() - this.clusterId = cluster.clusterId - log.debug "[SEQERA] Cluster created id: " + cluster.clusterId - } - - protected void deleteCluster() { - log.debug "[SEQERA] Deleting cluster: " + clusterId - client.deleteCluster(this.clusterId) - log.debug "[SEQERA] Cluster id deleted" - } protected void createClient() { this.client = new SeqeraClient(session) + this.clusterHandler = new SeqeraClusterHandler(client) } @Override @@ -100,6 +89,6 @@ class SeqeraExecutor extends Executor implements ExtensionPoint { } String getClusterId() { - return clusterId + return clusterHandler.getClusterId() } } diff --git a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy index 0eab996976..cfb290bbae 100644 --- a/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy +++ b/plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy @@ -70,14 +70,16 @@ class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask { @Override void submit() { + int cpuRequest = task.config.getCpus() ?: 1 + long memoryRequest = task.config.getMemory() ? task.config.getMemory().toBytes() : 1024 * 1024 * 1024 final req = new CreateJobRequest() .withCommand(fusionSubmitCli()) .withImage(task.getContainer()) .withClusterId(executor.getClusterId()) .withEnvironment(fusionLauncher().fusionEnv()) .withPlatform(task.getContainerPlatform()) - .withCpus(task.config.getCpus()) - .withMemory(task.config.getMemory().toBytes()) + .withCpus(cpuRequest) + .withMemory(memoryRequest) log.debug "[SEQERA] Submitting job request=${req}" final resp = client.createJob(req) this.jobId = resp.jobId