Skip to content

Commit 58a68c0

Browse files
committed
Seqera scheduler create cluster [ci fast]
1 parent 798c65b commit 58a68c0

File tree

4 files changed

+79
-17
lines changed

4 files changed

+79
-17
lines changed

plugins/nf-seqera/src/main/io/seqera/client/SeqeraClient.groovy

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,6 @@
1717

1818
package io.seqera.client
1919

20-
import java.net.http.HttpClient
21-
import java.net.http.HttpRequest
22-
import java.net.http.HttpResponse
23-
import java.time.Duration
24-
import java.time.temporal.ChronoUnit
25-
import java.util.concurrent.Executors
26-
import java.util.function.Predicate
27-
2820
import dev.failsafe.Failsafe
2921
import dev.failsafe.RetryPolicy
3022
import dev.failsafe.event.EventListener
@@ -35,15 +27,20 @@ import groovy.transform.CompileStatic
3527
import groovy.util.logging.Slf4j
3628
import io.seqera.config.SeqeraConfig
3729
import io.seqera.exception.BadResponseException
38-
import io.seqera.sched.api.v1a1.CancelJobResponse
39-
import io.seqera.sched.api.v1a1.CreateJobRequest
40-
import io.seqera.sched.api.v1a1.CreateJobResponse
41-
import io.seqera.sched.api.v1a1.DescribeJobResponse
42-
import io.seqera.sched.api.v1a1.GetJobLogsResponse
30+
import io.seqera.sched.api.v1a1.*
4331
import io.seqera.serde.Serde
4432
import io.seqera.util.trace.TraceUtils
4533
import nextflow.Session
4634
import nextflow.util.Threads
35+
36+
import java.net.http.HttpClient
37+
import java.net.http.HttpRequest
38+
import java.net.http.HttpResponse
39+
import java.time.Duration
40+
import java.time.temporal.ChronoUnit
41+
import java.util.concurrent.Executors
42+
import java.util.function.Predicate
43+
4744
/**
4845
*
4946
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@@ -99,6 +96,17 @@ class SeqeraClient {
9996
return send0(req, response)
10097
}
10198

99+
protected <T,R> R delete0WithBody(String uri, T request, Class<R> response) {
100+
final trace = TraceUtils.rndTrace()
101+
final body = JsonOutput.toJson(request)
102+
final req = HttpRequest.newBuilder()
103+
.uri(URI.create(uri))
104+
.headers('Content-Type','application/json', 'Traceparent', trace)
105+
.method("DELETE", HttpRequest.BodyPublishers.ofString(body))
106+
.build()
107+
return send0(req, response)
108+
}
109+
102110
protected <T,R> R post0(String uri, T request, Class<R> response) {
103111
final trace = TraceUtils.rndTrace()
104112
final body = JsonOutput.toJson(request)
@@ -142,6 +150,29 @@ class SeqeraClient {
142150
return get0(uri, GetJobLogsResponse)
143151
}
144152

153+
CreateClusterResponse createCluster() {
154+
final uri = "${config.endpoint}/v1a1/cluster"
155+
final request = [region: this.config.getRegion(), keyName: this.config.getKeyPairName()]
156+
return post0(uri, request, CreateClusterResponse)
157+
}
158+
159+
Map getClusterInfo(String clusterId) {
160+
final uri = "${config.endpoint}/v1a1/cluster/${clusterId}"
161+
return get0(uri, Map)
162+
}
163+
164+
Map deleteCluster(String clusterId) {
165+
final uri = "${config.endpoint}/v1a1/cluster"
166+
final request = [clusterId: clusterId, region: this.config.getRegion()]
167+
return delete0WithBody(uri, request, Map)
168+
}
169+
170+
Map addNodeToCluster(String clusterId, String instanceType) {
171+
final uri = "${config.endpoint}/v1a1/cluster/${clusterId}/add"
172+
final request = [clusterId: clusterId, instanceType: instanceType]
173+
return post0(uri, request, Map)
174+
}
175+
145176
protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond, Predicate<T> handle) {
146177
final cfg = config.retryOpts()
147178
final listener = new EventListener<ExecutionAttemptedEvent<T>>() {

plugins/nf-seqera/src/main/io/seqera/config/SeqeraConfig.groovy

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,21 @@ class SeqeraConfig {
3030

3131
private String endpoint
3232

33+
private String region
34+
35+
private String keyPairName
36+
3337
SeqeraConfig(Map opts) {
3438
this.retryOpts = new RetryOpts(opts.retryPolicy as Map ?: Map.of())
3539
this.endpoint = opts.endpoint as String
36-
if( !endpoint )
40+
if (!endpoint)
3741
throw new IllegalArgumentException("Missing Seqera endpoint - make sure to specify 'seqera.endpoint' settings")
42+
43+
this.region = opts.region as String
44+
if (!region)
45+
region = "eu-central-1"
46+
47+
this.keyPairName = opts.keyPairName as String
3848
}
3949

4050
RetryOpts retryOpts() {
@@ -44,4 +54,12 @@ class SeqeraConfig {
4454
String getEndpoint() {
4555
return endpoint
4656
}
57+
58+
String getRegion() {
59+
return region
60+
}
61+
62+
String getKeyPairName() {
63+
return keyPairName
64+
}
4765
}

plugins/nf-seqera/src/main/io/seqera/executor/SeqeraExecutor.groovy

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package io.seqera.executor
1919

20+
import groovy.util.logging.Slf4j
2021
import io.seqera.client.SeqeraClient
2122
import nextflow.exception.AbortOperationException
2223
import nextflow.executor.Executor
@@ -33,18 +34,25 @@ import org.pf4j.ExtensionPoint
3334
*
3435
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
3536
*/
36-
@ServiceName("seqera")
37+
@Slf4j
38+
@ServiceName('seqera')
3739
class SeqeraExecutor extends Executor implements ExtensionPoint {
3840

3941
private SeqeraClient client
4042

43+
private String clusterId
44+
4145
@Override
4246
protected void register() {
4347
createClient()
4448
}
4549

4650
protected void createClient() {
4751
this.client = new SeqeraClient(session)
52+
log.debug "[SEQERA] Creating cluster for workflow"
53+
final cluster = client.createCluster()
54+
this.clusterId = cluster.clusterId
55+
log.debug "[SEQERA] Cluster created id: " + cluster.clusterId
4856
}
4957

5058
@Override
@@ -54,7 +62,7 @@ class SeqeraExecutor extends Executor implements ExtensionPoint {
5462

5563
@Override
5664
TaskHandler createTaskHandler(TaskRun task) {
57-
return new SeqeraTaskHandler(task, this)
65+
return new SeqeraTaskHandler(task, this)
5866
}
5967

6068
/**
@@ -67,12 +75,16 @@ class SeqeraExecutor extends Executor implements ExtensionPoint {
6775
@Override
6876
boolean isFusionEnabled() {
6977
final enabled = FusionHelper.isFusionEnabled(session)
70-
if( !enabled )
78+
if (!enabled)
7179
throw new AbortOperationException("Seqera executor requires the use of Fusion file system")
7280
return true
7381
}
7482

7583
SeqeraClient getClient() {
7684
return client
7785
}
86+
87+
String getClusterId() {
88+
return clusterId
89+
}
7890
}

plugins/nf-seqera/src/main/io/seqera/executor/SeqeraTaskHandler.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask {
7272
final req = new CreateJobRequest()
7373
.withCommand(fusionSubmitCli())
7474
.withImage(task.getContainer())
75+
.withClusterId(executor.getClusterId())
7576
.withEnvironment(fusionLauncher().fusionEnv())
7677
.withPlatform(task.getContainerPlatform())
7778
log.debug "[SEQERA] Submitting job request=${req}"

0 commit comments

Comments
 (0)