diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy new file mode 100644 index 0000000000..4d4cf01973 --- /dev/null +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy @@ -0,0 +1,74 @@ +/* + * Copyright 2025, Seqera + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.azure.batch + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.Session +import nextflow.processor.TaskProcessor +import nextflow.trace.TraceObserver + +/** + * Observer that handles process termination events for Azure Batch executor. + * When a process terminates (all tasks have been submitted), this observer + * will eagerly set the corresponding Azure Batch job to auto-terminate when + * all tasks complete. + * + * @author Adam Talbot + */ +@Slf4j +@CompileStatic +class AzBatchProcessObserver implements TraceObserver { + + AzBatchProcessObserver(Session session) { + // Session not needed, but required by factory interface + } + + /** + * Called when a process terminates (all tasks have been submitted). + * Sets Azure Batch jobs to auto-terminate when all tasks complete. + */ + @Override + void onProcessTerminate(TaskProcessor processor) { + // Check if this process uses the Azure Batch executor + if( !(processor.executor instanceof AzBatchExecutor) ) { + return + } + + final executor = processor.executor as AzBatchExecutor + final batchService = executor.batchService + + // Check if auto-termination is enabled + if( !batchService?.config?.batch()?.terminateJobsOnCompletion ) { + log.trace "Azure Batch job auto-termination is disabled, skipping eager termination for process: ${processor.name}" + return + } + + // Find and set auto-termination for all jobs associated with this processor + batchService.allJobIds.findAll { key, jobId -> + key.processor == processor + }.values().each { jobId -> + log.debug "Setting Azure Batch job ${jobId} to auto-terminate for completed process: ${processor.name}" + try { + batchService.setJobAutoTermination(jobId) + } + catch( Exception e ) { + log.warn "Failed to set auto-termination for Azure Batch job ${jobId} associated with process '${processor.name}' - ${e.message ?: e}" + } + } + } +} \ No newline at end of file diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy new file mode 100644 index 0000000000..87b2474aee --- /dev/null +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy @@ -0,0 +1,37 @@ +/* + * Copyright 2025, Seqera + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.azure.batch + +import groovy.transform.CompileStatic +import nextflow.Session +import nextflow.trace.TraceObserver +import nextflow.trace.TraceObserverFactory + +/** + * Factory for creating the Azure Batch process observer that enables eager termination + * of Azure Batch jobs when processes complete. + * + * @author Adam Talbot + */ +@CompileStatic +class AzBatchProcessObserverFactory implements TraceObserverFactory { + + @Override + Collection create(Session session) { + return [new AzBatchProcessObserver(session)] + } +} \ No newline at end of file diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy index 5d70466656..cfafd38edc 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy @@ -977,30 +977,53 @@ class AzBatchService implements Closeable { } /** - * Set all jobs to terminate on completion. + * Set a specific Azure Batch job to terminate when all tasks complete. + * This is called eagerly when a Nextflow process completes (all tasks submitted) + * rather than waiting for the entire pipeline to finish. + * + * @param jobId The Azure Batch job ID to set for auto-termination */ - protected void terminateJobs() { - for( String jobId : allJobIds.values() ) { - try { - log.trace "Setting Azure job ${jobId} to terminate on completion" + void setJobAutoTermination(String jobId) { + setJobTermination(jobId) + } - final job = apply(() -> client.getJob(jobId)) - final poolInfo = job.poolInfo + /** + * Set a job to terminate when all tasks complete. + * + * @param jobId The Azure Batch job ID to set for auto-termination + */ + protected void setJobTermination(String jobId) { + try { + log.trace "Setting Azure job ${jobId} to terminate on completion" - final jobParameter = new BatchJobUpdateContent() - .setOnAllTasksComplete(OnAllBatchTasksComplete.TERMINATE_JOB) - .setPoolInfo(poolInfo) + final job = apply(() -> client.getJob(jobId)) + final poolInfo = job.poolInfo - apply(() -> client.updateJob(jobId, jobParameter)) - } - catch (HttpResponseException e) { - if (e.response.statusCode == 409) { - log.debug "Azure Batch job ${jobId} already terminated, skipping termination" - } else { - log.warn "Unable to terminate Azure Batch job ${jobId} - Status: ${e.response.statusCode}, Reason: ${e.message ?: e}" - } + final jobParameter = new BatchJobUpdateContent() + .setOnAllTasksComplete(OnAllBatchTasksComplete.TERMINATE_JOB) + .setPoolInfo(poolInfo) + + apply(() -> client.updateJob(jobId, jobParameter)) + } + catch (HttpResponseException e) { + if (e.response.statusCode == 409) { + log.debug "Azure Batch job ${jobId} already terminated, skipping auto-termination setup" + } else { + log.warn "Unable to set auto-termination for Azure Batch job ${jobId} - Status: ${e.response.statusCode}, Reason: ${e.message ?: e}" } } + catch (Exception e) { + log.warn "Unable to set auto-termination for Azure Batch job ${jobId} - Reason: ${e.message ?: e}" + } + } + + /** + * Set all jobs to terminate on completion. + */ + protected void terminateJobs() { + for( String jobId : allJobIds.values() ) { + setJobTermination(jobId) + } } protected void cleanupJobs() { diff --git a/plugins/nf-azure/src/resources/META-INF/extensions.idx b/plugins/nf-azure/src/resources/META-INF/extensions.idx index 685c239bca..c9897c5471 100644 --- a/plugins/nf-azure/src/resources/META-INF/extensions.idx +++ b/plugins/nf-azure/src/resources/META-INF/extensions.idx @@ -15,6 +15,7 @@ # nextflow.cloud.azure.batch.AzBatchExecutor +nextflow.cloud.azure.batch.AzBatchProcessObserverFactory nextflow.cloud.azure.file.AzPathFactory nextflow.cloud.azure.file.AzPathSerializer nextflow.cloud.azure.fusion.AzFusionEnv diff --git a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy new file mode 100644 index 0000000000..1427651f47 --- /dev/null +++ b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy @@ -0,0 +1,89 @@ +/* + * Copyright 2025, Seqera + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.azure.batch + +import nextflow.Session +import nextflow.cloud.azure.config.AzBatchOpts +import nextflow.cloud.azure.config.AzConfig +import nextflow.executor.Executor +import nextflow.processor.TaskProcessor +import spock.lang.Specification + +/** + * Test for AzBatchProcessObserver + * + * @author Adam Talbot + */ +class AzBatchProcessObserverTest extends Specification { + + def 'should only act on Azure Batch executors'() { + given: + def observer = new AzBatchProcessObserver(Mock(Session)) + def processor = Mock(TaskProcessor) { + getExecutor() >> Mock(Executor) // Not an AzBatchExecutor + } + + when: + observer.onProcessTerminate(processor) + + then: + noExceptionThrown() + } + + def 'should set job auto-termination when enabled'() { + given: + def observer = new AzBatchProcessObserver(Mock(Session)) + def processor = Mock(TaskProcessor) { getName() >> 'test-process' } + def batchService = Mock(AzBatchService) { + getConfig() >> Mock(AzConfig) { + batch() >> Mock(AzBatchOpts) { + terminateJobsOnCompletion >> true + } + } + getAllJobIds() >> [(new AzJobKey(processor, 'pool1')): 'job123'] + } + def executor = Mock(AzBatchExecutor) { getBatchService() >> batchService } + processor.getExecutor() >> executor + + when: + observer.onProcessTerminate(processor) + + then: + 1 * batchService.setJobAutoTermination('job123') + } + + def 'should skip when termination disabled'() { + given: + def observer = new AzBatchProcessObserver(Mock(Session)) + def processor = Mock(TaskProcessor) { getName() >> 'test-process' } + def batchService = Mock(AzBatchService) { + getConfig() >> Mock(AzConfig) { + batch() >> Mock(AzBatchOpts) { + terminateJobsOnCompletion >> false + } + } + } + def executor = Mock(AzBatchExecutor) { getBatchService() >> batchService } + processor.getExecutor() >> executor + + when: + observer.onProcessTerminate(processor) + + then: + 0 * batchService.setJobAutoTermination(_) + } +} \ No newline at end of file