Skip to content

Sched core implementation #6242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions plugins/nf-seqera/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2019, Seqera Labs.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* This Source Code Form is "Incompatible With Secondary Licenses", as
* defined by the Mozilla Public License, v. 2.0.
*/

apply plugin: 'java'
apply plugin: 'java-test-fixtures'
apply plugin: 'idea'
apply plugin: 'groovy'

sourceSets {
main.java.srcDirs = []
main.groovy.srcDirs = ['src/main']
main.resources.srcDirs = ['src/resources']
test.groovy.srcDirs = ['src/test']
test.java.srcDirs = []
test.resources.srcDirs = []
}

configurations {
// see https://docs.gradle.org/4.1/userguide/dependency_management.html#sub:exclude_transitive_dependencies
runtimeClasspath.exclude group: 'org.slf4j', module: 'slf4j-api'
}

dependencies {
compileOnly project(':nextflow')
compileOnly 'org.slf4j:slf4j-api:2.0.16'
compileOnly 'org.pf4j:pf4j:3.12.0'
api 'io.seqera:sched-api:0.1.0'
implementation 'com.squareup.moshi:moshi:1.15.2'
implementation 'com.squareup.moshi:moshi-adapters:1.15.2'

testImplementation(testFixtures(project(":nextflow")))
testImplementation "org.apache.groovy:groovy:4.0.26"
testImplementation "org.apache.groovy:groovy-nio:4.0.26"
}
211 changes: 211 additions & 0 deletions plugins/nf-seqera/src/main/io/seqera/client/SeqeraClient.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Copyright 2013-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.seqera.client

import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier
import groovy.json.JsonOutput
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.config.SeqeraConfig
import io.seqera.exception.BadResponseException
import io.seqera.sched.api.v1a1.*
import io.seqera.serde.Serde
import io.seqera.util.trace.TraceUtils
import nextflow.Session
import nextflow.util.Threads

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.Executors
import java.util.function.Predicate

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
@CompileStatic
class SeqeraClient {

private HttpClient httpClient

private CookieManager cookieManager

private SeqeraConfig config

SeqeraClient(Session session) {
this.config = new SeqeraConfig(session.config.seqera as Map ?: Collections.emptyMap())
// the cookie manager
cookieManager = new CookieManager()
// create http client
this.httpClient = newHttpClient()
}

protected HttpClient newHttpClient() {
final builder = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(10))
.cookieHandler(cookieManager)
// use virtual threads executor if enabled
if( Threads.useVirtual() )
builder.executor(Executors.newVirtualThreadPerTaskExecutor())
// build and return the new client
return builder.build()
}

protected <R> R get0(String uri, Class<R> responseType) {
final trace = TraceUtils.rndTrace()
final req = HttpRequest.newBuilder()
.uri(URI.create(uri))
.headers('Content-Type','application/json', 'Traceparent', trace)
.GET()
.build()
send0(req, responseType)
}

protected <R> R delete0(String uri, Class<R> response) {
final trace = TraceUtils.rndTrace()
final req = HttpRequest.newBuilder()
.uri(URI.create(uri))
.headers('Content-Type','application/json', 'Traceparent', trace)
.DELETE()
.build()
return send0(req, response)
}

protected <T,R> R delete0WithBody(String uri, T request, Class<R> response) {
final trace = TraceUtils.rndTrace()
final body = JsonOutput.toJson(request)
final req = HttpRequest.newBuilder()
.uri(URI.create(uri))
.headers('Content-Type','application/json', 'Traceparent', trace)
.method("DELETE", HttpRequest.BodyPublishers.ofString(body))
.build()
return send0(req, response)
}

protected <T,R> R post0(String uri, T request, Class<R> response) {
final trace = TraceUtils.rndTrace()
final body = JsonOutput.toJson(request)
final req = HttpRequest.newBuilder()
.uri(URI.create(uri))
.headers('Content-Type','application/json', 'Traceparent', trace)
.POST(HttpRequest.BodyPublishers.ofString(body))
.build()
return send0(req, response)
}

protected <T> T send0(HttpRequest req, Class<T> type) {
final HttpResponse<String> resp = httpSend(req);
log.debug("Seqera status response: statusCode={}; body={}", resp.statusCode(), resp.body())
if( resp.statusCode()==200 ) {
return Serde.fromJson(resp.body(), type)
}
else {
final msg = "Seqera invalid response: ${req.method()} ${req.uri()} [${resp.statusCode()}] ${resp.body()}"
throw new BadResponseException(msg)
}
}

CreateJobResponse createJob(CreateJobRequest request) {
final uri = "${config.endpoint}/v1a1/jobs"
return post0(uri, request, CreateJobResponse)
}

DescribeJobResponse describeJob(String jobId) {
final uri = "${config.endpoint}/v1a1/jobs/${jobId}"
return get0(uri, DescribeJobResponse)
}

CancelJobResponse cancelJob(String jobId) {
final uri = "${config.endpoint}/v1a1/jobs/${jobId}"
return delete0(uri, CancelJobResponse)
}

GetJobLogsResponse getJobLogs(String jobId) {
final uri = "${config.endpoint}/v1a1/jobs/${jobId}/logs"
return get0(uri, GetJobLogsResponse)
}

CreateClusterResponse createCluster() {
final uri = "${config.endpoint}/v1a1/cluster"
final request = [region: this.config.getRegion(), keyName: this.config.getKeyPairName()]
return post0(uri, request, CreateClusterResponse)
}

Map getClusterInfo(String clusterId) {
final uri = "${config.endpoint}/v1a1/cluster/${clusterId}"
return get0(uri, Map)
}

Map deleteCluster(String clusterId) {
final uri = "${config.endpoint}/v1a1/cluster"
final request = [clusterId: clusterId, region: this.config.getRegion()]
return delete0WithBody(uri, request, Map)
}

Map addNodeToCluster(String clusterId, String instanceType) {
final uri = "${config.endpoint}/v1a1/cluster/${clusterId}/add"
final request = [clusterId: clusterId, instanceType: instanceType]
return post0(uri, request, Map)
}

protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond, Predicate<T> handle) {
final cfg = config.retryOpts()
final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
@Override
void accept(ExecutionAttemptedEvent event) throws Throwable {
def msg = "Seqera connection failure - attempt: ${event.attemptCount}"
if( event.lastResult!=null )
msg += "; response: ${event.lastResult}"
if( event.lastFailure != null )
msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}"
log.debug(msg)
}
}
return RetryPolicy.<T>builder()
.handleIf(cond)
.handleResultIf(handle)
.withBackoff(cfg.delay.toMillis(), cfg.maxDelay.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(cfg.maxAttempts)
.withJitter(cfg.jitter)
.onRetry(listener)
.build()
}

protected <T> HttpResponse<T> safeApply(CheckedSupplier action) {
final retryOnException = (e -> e instanceof IOException) as Predicate<? extends Throwable>
final retryOnStatusCode = ((HttpResponse<T> resp) -> resp.statusCode() in SERVER_ERRORS) as Predicate<HttpResponse<T>>
final policy = retryPolicy(retryOnException, retryOnStatusCode)
return Failsafe.with(policy).get(action)
}

static private final List<Integer> SERVER_ERRORS = [429,500,502,503,504]

protected HttpResponse<String> httpSend(HttpRequest req) {
return safeApply(() -> httpClient.send(req, HttpResponse.BodyHandlers.ofString()))
}
}
49 changes: 49 additions & 0 deletions plugins/nf-seqera/src/main/io/seqera/config/RetryOpts.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.seqera.config

import groovy.transform.CompileStatic
import groovy.transform.ToString
import nextflow.util.Duration

/**
* Model retry options for Wave http requests
*/
@ToString(includeNames = true, includePackage = false)
@CompileStatic
class RetryOpts {
Duration delay = Duration.of('450ms')
Duration maxDelay = Duration.of('90s')
int maxAttempts = 10
double jitter = 0.25

RetryOpts() {
this(Collections.emptyMap())
}

RetryOpts(Map config) {
if( config.delay )
delay = config.delay as Duration
if( config.maxDelay )
maxDelay = config.maxDelay as Duration
if( config.maxAttempts )
maxAttempts = config.maxAttempts as int
if( config.jitter )
jitter = config.jitter as double
}
}
65 changes: 65 additions & 0 deletions plugins/nf-seqera/src/main/io/seqera/config/SeqeraConfig.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2013-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.seqera.config

import groovy.transform.CompileStatic

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class SeqeraConfig {

private RetryOpts retryOpts

private String endpoint

private String region

private String keyPairName

SeqeraConfig(Map opts) {
this.retryOpts = new RetryOpts(opts.retryPolicy as Map ?: Map.of())
this.endpoint = opts.endpoint as String
if (!endpoint)
throw new IllegalArgumentException("Missing Seqera endpoint - make sure to specify 'seqera.endpoint' settings")

this.region = opts.region as String
if (!region)
region = "eu-central-1"

this.keyPairName = opts.keyPairName as String
}

RetryOpts retryOpts() {
this.retryOpts
}

String getEndpoint() {
return endpoint
}

String getRegion() {
return region
}

String getKeyPairName() {
return keyPairName
}
}
Loading
Loading