Skip to content

Commit 798c65b

Browse files
pditommasofntlnz
authored andcommitted
Seqera scheduler first commit [ci fast]
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
1 parent 700fadd commit 798c65b

File tree

14 files changed

+789
-0
lines changed

14 files changed

+789
-0
lines changed

plugins/nf-seqera/build.gradle

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) 2019, Seqera Labs.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*
8+
* This Source Code Form is "Incompatible With Secondary Licenses", as
9+
* defined by the Mozilla Public License, v. 2.0.
10+
*/
11+
12+
apply plugin: 'java'
13+
apply plugin: 'java-test-fixtures'
14+
apply plugin: 'idea'
15+
apply plugin: 'groovy'
16+
17+
sourceSets {
18+
main.java.srcDirs = []
19+
main.groovy.srcDirs = ['src/main']
20+
main.resources.srcDirs = ['src/resources']
21+
test.groovy.srcDirs = ['src/test']
22+
test.java.srcDirs = []
23+
test.resources.srcDirs = []
24+
}
25+
26+
configurations {
27+
// see https://docs.gradle.org/4.1/userguide/dependency_management.html#sub:exclude_transitive_dependencies
28+
runtimeClasspath.exclude group: 'org.slf4j', module: 'slf4j-api'
29+
}
30+
31+
dependencies {
32+
compileOnly project(':nextflow')
33+
compileOnly 'org.slf4j:slf4j-api:2.0.16'
34+
compileOnly 'org.pf4j:pf4j:3.12.0'
35+
api 'io.seqera:sched-api:0.1.0'
36+
implementation 'com.squareup.moshi:moshi:1.15.2'
37+
implementation 'com.squareup.moshi:moshi-adapters:1.15.2'
38+
39+
testImplementation(testFixtures(project(":nextflow")))
40+
testImplementation "org.apache.groovy:groovy:4.0.26"
41+
testImplementation "org.apache.groovy:groovy-nio:4.0.26"
42+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright 2013-2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.client
19+
20+
import java.net.http.HttpClient
21+
import java.net.http.HttpRequest
22+
import java.net.http.HttpResponse
23+
import java.time.Duration
24+
import java.time.temporal.ChronoUnit
25+
import java.util.concurrent.Executors
26+
import java.util.function.Predicate
27+
28+
import dev.failsafe.Failsafe
29+
import dev.failsafe.RetryPolicy
30+
import dev.failsafe.event.EventListener
31+
import dev.failsafe.event.ExecutionAttemptedEvent
32+
import dev.failsafe.function.CheckedSupplier
33+
import groovy.json.JsonOutput
34+
import groovy.transform.CompileStatic
35+
import groovy.util.logging.Slf4j
36+
import io.seqera.config.SeqeraConfig
37+
import io.seqera.exception.BadResponseException
38+
import io.seqera.sched.api.v1a1.CancelJobResponse
39+
import io.seqera.sched.api.v1a1.CreateJobRequest
40+
import io.seqera.sched.api.v1a1.CreateJobResponse
41+
import io.seqera.sched.api.v1a1.DescribeJobResponse
42+
import io.seqera.sched.api.v1a1.GetJobLogsResponse
43+
import io.seqera.serde.Serde
44+
import io.seqera.util.trace.TraceUtils
45+
import nextflow.Session
46+
import nextflow.util.Threads
47+
/**
48+
*
49+
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
50+
*/
51+
@Slf4j
52+
@CompileStatic
53+
class SeqeraClient {
54+
55+
private HttpClient httpClient
56+
57+
private CookieManager cookieManager
58+
59+
private SeqeraConfig config
60+
61+
SeqeraClient(Session session) {
62+
this.config = new SeqeraConfig(session.config.seqera as Map ?: Collections.emptyMap())
63+
// the cookie manager
64+
cookieManager = new CookieManager()
65+
// create http client
66+
this.httpClient = newHttpClient()
67+
}
68+
69+
protected HttpClient newHttpClient() {
70+
final builder = HttpClient.newBuilder()
71+
.version(HttpClient.Version.HTTP_1_1)
72+
.followRedirects(HttpClient.Redirect.NORMAL)
73+
.connectTimeout(Duration.ofSeconds(10))
74+
.cookieHandler(cookieManager)
75+
// use virtual threads executor if enabled
76+
if( Threads.useVirtual() )
77+
builder.executor(Executors.newVirtualThreadPerTaskExecutor())
78+
// build and return the new client
79+
return builder.build()
80+
}
81+
82+
protected <R> R get0(String uri, Class<R> responseType) {
83+
final trace = TraceUtils.rndTrace()
84+
final req = HttpRequest.newBuilder()
85+
.uri(URI.create(uri))
86+
.headers('Content-Type','application/json', 'Traceparent', trace)
87+
.GET()
88+
.build()
89+
send0(req, responseType)
90+
}
91+
92+
protected <R> R delete0(String uri, Class<R> response) {
93+
final trace = TraceUtils.rndTrace()
94+
final req = HttpRequest.newBuilder()
95+
.uri(URI.create(uri))
96+
.headers('Content-Type','application/json', 'Traceparent', trace)
97+
.DELETE()
98+
.build()
99+
return send0(req, response)
100+
}
101+
102+
protected <T,R> R post0(String uri, T request, Class<R> response) {
103+
final trace = TraceUtils.rndTrace()
104+
final body = JsonOutput.toJson(request)
105+
final req = HttpRequest.newBuilder()
106+
.uri(URI.create(uri))
107+
.headers('Content-Type','application/json', 'Traceparent', trace)
108+
.POST(HttpRequest.BodyPublishers.ofString(body))
109+
.build()
110+
return send0(req, response)
111+
}
112+
113+
protected <T> T send0(HttpRequest req, Class<T> type) {
114+
final HttpResponse<String> resp = httpSend(req);
115+
log.debug("Seqera status response: statusCode={}; body={}", resp.statusCode(), resp.body())
116+
if( resp.statusCode()==200 ) {
117+
return Serde.fromJson(resp.body(), type)
118+
}
119+
else {
120+
final msg = "Seqera invalid response: ${req.method()} ${req.uri()} [${resp.statusCode()}] ${resp.body()}"
121+
throw new BadResponseException(msg)
122+
}
123+
}
124+
125+
CreateJobResponse createJob(CreateJobRequest request) {
126+
final uri = "${config.endpoint}/v1a1/jobs"
127+
return post0(uri, request, CreateJobResponse)
128+
}
129+
130+
DescribeJobResponse describeJob(String jobId) {
131+
final uri = "${config.endpoint}/v1a1/jobs/${jobId}"
132+
return get0(uri, DescribeJobResponse)
133+
}
134+
135+
CancelJobResponse cancelJob(String jobId) {
136+
final uri = "${config.endpoint}/v1a1/jobs/${jobId}"
137+
return delete0(uri, CancelJobResponse)
138+
}
139+
140+
GetJobLogsResponse getJobLogs(String jobId) {
141+
final uri = "${config.endpoint}/v1a1/jobs/${jobId}/logs"
142+
return get0(uri, GetJobLogsResponse)
143+
}
144+
145+
protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond, Predicate<T> handle) {
146+
final cfg = config.retryOpts()
147+
final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
148+
@Override
149+
void accept(ExecutionAttemptedEvent event) throws Throwable {
150+
def msg = "Seqera connection failure - attempt: ${event.attemptCount}"
151+
if( event.lastResult!=null )
152+
msg += "; response: ${event.lastResult}"
153+
if( event.lastFailure != null )
154+
msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}"
155+
log.debug(msg)
156+
}
157+
}
158+
return RetryPolicy.<T>builder()
159+
.handleIf(cond)
160+
.handleResultIf(handle)
161+
.withBackoff(cfg.delay.toMillis(), cfg.maxDelay.toMillis(), ChronoUnit.MILLIS)
162+
.withMaxAttempts(cfg.maxAttempts)
163+
.withJitter(cfg.jitter)
164+
.onRetry(listener)
165+
.build()
166+
}
167+
168+
protected <T> HttpResponse<T> safeApply(CheckedSupplier action) {
169+
final retryOnException = (e -> e instanceof IOException) as Predicate<? extends Throwable>
170+
final retryOnStatusCode = ((HttpResponse<T> resp) -> resp.statusCode() in SERVER_ERRORS) as Predicate<HttpResponse<T>>
171+
final policy = retryPolicy(retryOnException, retryOnStatusCode)
172+
return Failsafe.with(policy).get(action)
173+
}
174+
175+
static private final List<Integer> SERVER_ERRORS = [429,500,502,503,504]
176+
177+
protected HttpResponse<String> httpSend(HttpRequest req) {
178+
return safeApply(() -> httpClient.send(req, HttpResponse.BodyHandlers.ofString()))
179+
}
180+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2013-2024, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.config
19+
20+
import groovy.transform.CompileStatic
21+
import groovy.transform.ToString
22+
import nextflow.util.Duration
23+
24+
/**
25+
* Model retry options for Wave http requests
26+
*/
27+
@ToString(includeNames = true, includePackage = false)
28+
@CompileStatic
29+
class RetryOpts {
30+
Duration delay = Duration.of('450ms')
31+
Duration maxDelay = Duration.of('90s')
32+
int maxAttempts = 10
33+
double jitter = 0.25
34+
35+
RetryOpts() {
36+
this(Collections.emptyMap())
37+
}
38+
39+
RetryOpts(Map config) {
40+
if( config.delay )
41+
delay = config.delay as Duration
42+
if( config.maxDelay )
43+
maxDelay = config.maxDelay as Duration
44+
if( config.maxAttempts )
45+
maxAttempts = config.maxAttempts as int
46+
if( config.jitter )
47+
jitter = config.jitter as double
48+
}
49+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2013-2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.config
19+
20+
import groovy.transform.CompileStatic
21+
22+
/**
23+
*
24+
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
25+
*/
26+
@CompileStatic
27+
class SeqeraConfig {
28+
29+
private RetryOpts retryOpts
30+
31+
private String endpoint
32+
33+
SeqeraConfig(Map opts) {
34+
this.retryOpts = new RetryOpts(opts.retryPolicy as Map ?: Map.of())
35+
this.endpoint = opts.endpoint as String
36+
if( !endpoint )
37+
throw new IllegalArgumentException("Missing Seqera endpoint - make sure to specify 'seqera.endpoint' settings")
38+
}
39+
40+
RetryOpts retryOpts() {
41+
this.retryOpts
42+
}
43+
44+
String getEndpoint() {
45+
return endpoint
46+
}
47+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2013-2024, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.exception
19+
20+
import groovy.transform.InheritConstructors
21+
22+
/**
23+
* Model an invalid HTTP response
24+
*
25+
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
26+
*/
27+
@InheritConstructors
28+
class BadResponseException extends RuntimeException {
29+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2013-2024, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.exception
19+
20+
import groovy.transform.InheritConstructors
21+
22+
/**
23+
* Unauthorized access exception
24+
*
25+
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
26+
*/
27+
@InheritConstructors
28+
class UnauthorizedException extends RuntimeException {
29+
}

0 commit comments

Comments
 (0)