Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/controllers/concerns/workflow_execution_actions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ def destroy # rubocop:disable Metrics/MethodLength
end

def cancel # rubocop:disable Metrics/MethodLength
WorkflowExecutions::CancelService.new(@workflow_execution, current_user).execute
result = WorkflowExecutions::CancelService.new(@workflow_execution, current_user).execute

respond_to do |format|
format.turbo_stream do
if @workflow_execution.canceled? || @workflow_execution.canceling?
if result && (@workflow_execution.canceled? || @workflow_execution.canceling?)
render status: :ok,
locals: { type: 'success',
message: t('concerns.workflow_execution_actions.cancel.success',
Expand Down
8 changes: 7 additions & 1 deletion app/jobs/workflow_execution_cancelation_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ def perform(workflow_execution, user)
end

wes_connection = Integrations::Ga4ghWesApi::V1::ApiConnection.new.conn
WorkflowExecutions::CancelationService.new(workflow_execution, wes_connection, user).execute
result = WorkflowExecutions::CancelationService.new(workflow_execution, wes_connection, user).execute

if result
WorkflowExecutionCleanupJob.perform_later(workflow_execution)
else
handle_unable_to_process_job(workflow_execution, self.class.name) unless result
end
end
end
4 changes: 2 additions & 2 deletions app/jobs/workflow_execution_cleanup_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def perform(workflow_execution)
workflow_execution.canceled? ||
workflow_execution.error?

# TODO: early return from cleanup service unhandled
workflow_execution = WorkflowExecutions::CleanupService.new(workflow_execution).execute # rubocop:disable Lint/UselessAssignment
# Final stage of WorkflowExecution life cycle. No further work required.
WorkflowExecutions::CleanupService.new(workflow_execution).execute
end
end
10 changes: 6 additions & 4 deletions app/jobs/workflow_execution_completion_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ def perform(workflow_execution)
return handle_error_state_and_clean(workflow_execution)
end

WorkflowExecutions::CompletionService.new(workflow_execution).execute

# TODO: this job doesn't have any tests. Only the service has tests.
# Tests should be added when job queuing from CompletionService is refactored into this job.
result = WorkflowExecutions::CompletionService.new(workflow_execution).execute
if result
WorkflowExecutionCleanupJob.perform_later(workflow_execution)
else
handle_unable_to_process_job(workflow_execution, self.class.name)
end
end
end
6 changes: 6 additions & 0 deletions app/jobs/workflow_execution_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ def handle_error_state_and_clean(workflow_execution)
workflow_execution.update_attribute('state', :error) # rubocop:disable Rails/SkipsModelValidations
WorkflowExecutionCleanupJob.perform_later(workflow_execution)
end

def handle_unable_to_process_job(workflow_execution, job_name)
workflow_execution.errors.add(:base,
I18n.t('activerecord.errors.models.workflow_execution.invalid_job_state', job_name:))
handle_error_state_and_clean(workflow_execution)
end
end
9 changes: 1 addition & 8 deletions app/jobs/workflow_execution_preparation_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,12 @@ def perform(workflow_execution)
return handle_error_state_and_clean(workflow_execution)
end

# TODO: the service returning false is actually used correctly here
# When the other jobs/services early returns are refactored this should also be included.
result = WorkflowExecutions::PreparationService.new(workflow_execution).execute

if result
WorkflowExecutionSubmissionJob.perform_later(workflow_execution)
else
@workflow_execution.state = :error
@workflow_execution.cleaned = true
@workflow_execution.save
handle_unable_to_process_job(workflow_execution, self.class.name)
end

# TODO: this job doesn't have any tests. Only the service has tests.
# Tests should be added when job queuing from CompletionService is refactored into this job.
end
end
8 changes: 8 additions & 0 deletions app/jobs/workflow_execution_status_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ def perform(workflow_execution)
wes_connection = Integrations::Ga4ghWesApi::V1::ApiConnection.new.conn
workflow_execution = WorkflowExecutions::StatusService.new(workflow_execution, wes_connection).execute

if workflow_execution
queue_next_job(workflow_execution)
else
handle_unable_to_process_job(workflow_execution, self.class.name)
end
end

def queue_next_job(workflow_execution)
case workflow_execution.state.to_sym
when :canceled, :error
WorkflowExecutionCleanupJob.perform_later(workflow_execution)
Expand Down
6 changes: 5 additions & 1 deletion app/jobs/workflow_execution_submission_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def perform(workflow_execution)
wes_connection = Integrations::Ga4ghWesApi::V1::ApiConnection.new.conn
workflow_execution = WorkflowExecutions::SubmissionService.new(workflow_execution, wes_connection).execute

WorkflowExecutionStatusJob.set(wait_until: 30.seconds.from_now).perform_later(workflow_execution)
if workflow_execution
WorkflowExecutionStatusJob.set(wait_until: 30.seconds.from_now).perform_later(workflow_execution)
else
handle_unable_to_process_job(workflow_execution, self.class.name)
end
end
end
5 changes: 1 addition & 4 deletions app/services/workflow_executions/cancelation_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize(workflow_execution, wes_connection, user = nil, params = {})
end

def execute
return false unless @workflow_execution.canceling? # TODO: returning false needs rework
return false unless @workflow_execution.canceling?

@wes_client.cancel_run(@workflow_execution.run_id)

Expand All @@ -19,9 +19,6 @@ def execute

@workflow_execution.save

# TODO: job queuing should be handled by the job that called this service
WorkflowExecutionCleanupJob.perform_later(@workflow_execution)

@workflow_execution
end
end
Expand Down
2 changes: 1 addition & 1 deletion app/services/workflow_executions/cleanup_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def initialize(workflow_execution, user = nil, params = {})
end

def execute
return false if @workflow_execution.nil? || @workflow_execution.cleaned? # TODO: returning false needs rework
return if @workflow_execution.nil? || @workflow_execution.cleaned? # no further work needed

# This check is for safety as passing nil or an empty string into deleted_prefixed will delete all blobs
if @workflow_execution.blob_run_directory.present?
Expand Down
5 changes: 1 addition & 4 deletions app/services/workflow_executions/completion_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def initialize(workflow_execution, params = {})
end

def execute # rubocop:disable Metrics/MethodLength
return false unless @workflow_execution.completing? # TODO: returning false needs rework
return false unless @workflow_execution.completing?

run_output_data = download_decompress_parse_gziped_json("#{@output_base_path}iridanext.output.json.gz")

Expand Down Expand Up @@ -45,9 +45,6 @@ def execute # rubocop:disable Metrics/MethodLength

@workflow_execution.save

# TODO: job queuing should be handled by the job that called this service
WorkflowExecutionCleanupJob.perform_later(@workflow_execution)

@workflow_execution
end

Expand Down
2 changes: 1 addition & 1 deletion app/services/workflow_executions/preparation_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def initialize(workflow_execution, user = nil, params = {})

def execute # rubocop:disable Metrics/MethodLength
# confirm pipeline found
return false unless validate_pipeline # TODO: returning false needs rework
return false unless validate_pipeline

@samplesheet_headers = @pipeline.samplesheet_headers

Expand Down
2 changes: 1 addition & 1 deletion app/services/workflow_executions/status_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def initialize(workflow_execution, wes_connection, user = nil, params = {})
end

def execute
return false if @workflow_execution.run_id.nil? # TODO: returning false needs rework
return false if @workflow_execution.run_id.nil?

run_status = @wes_client.get_run_status(@workflow_execution.run_id)

Expand Down
2 changes: 1 addition & 1 deletion app/services/workflow_executions/submission_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize(workflow_execution, wes_connection, user = nil, params = {})
end

def execute
return false unless @workflow_execution.prepared? # TODO: returning false needs rework
return false unless @workflow_execution.prepared?

run = @wes_client.run_workflow(**@workflow_execution.as_wes_params)

Expand Down
1 change: 1 addition & 0 deletions config/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ en:
attributes:
name:
blank: can't be blank
invalid_job_state: Workflow Execution job is unable to be processed by Job type '{job_name}'
invalid_namespace: can only be Group or Project
invalid_workflow: unable to find workflow with name '%{workflow_name}' and version '%{workflow_version}'
missing_namespace: does not have a namespace
Expand Down
1 change: 1 addition & 0 deletions config/locales/fr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ fr:
attributes:
name:
blank: Ne peut pas être vide
invalid_job_state: Workflow Execution job is unable to be processed by Job type '{job_name}'
invalid_namespace: Ne peut être qu’un groupe ou un projet
invalid_workflow: Impossible de trouver le flux de travail avec le nom '%{workflow_name}' et la version '%{workflow_version}'
missing_namespace: N’a pas d’espace de noms
Expand Down
57 changes: 57 additions & 0 deletions test/jobs/workflow_execution_completion_job_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# frozen_string_literal: true

require 'test_helper'
require 'test_helpers/blob_test_helpers'
require 'active_job_test_case'

class WorkflowExecutionCompletionJobTest < ActiveJobTestCase
include BlobTestHelpers

def setup
@workflow_execution_canceling = workflow_executions(:irida_next_example_canceling)

# get a new secure token the workflow execution
@workflow_execution_completing = workflow_executions(:irida_next_example_completing_a)
blob_run_directory_a = ActiveStorage::Blob.generate_unique_secure_token
@workflow_execution_completing.blob_run_directory = blob_run_directory_a
@workflow_execution_completing.save

# create file blobs
@normal_output_json_file_blob = make_and_upload_blob(
filepath: 'test/fixtures/files/blob_outputs/normal/iridanext.output.json',
blob_run_directory: blob_run_directory_a,
gzip: true
)
@normal_output_summary_file_blob = make_and_upload_blob(
filepath: 'test/fixtures/files/blob_outputs/normal/summary.txt',
blob_run_directory: blob_run_directory_a
)
end

def teardown
# reset connections after each test to clear cache
Faraday.default_connection = nil
end

test 'successful job execution' do
workflow_execution = @workflow_execution_completing

perform_enqueued_jobs(only: WorkflowExecutionCompletionJob) do
WorkflowExecutionCompletionJob.perform_later(workflow_execution)
end

assert_enqueued_jobs(1, only: WorkflowExecutionCleanupJob)
assert_performed_jobs(1, only: WorkflowExecutionCompletionJob)
workflow_execution.reload.state.to_sym == :completed
end

test 'successful invalid execution' do
perform_enqueued_jobs(only: WorkflowExecutionCompletionJob) do
WorkflowExecutionCompletionJob.perform_later(@workflow_execution_canceling)
end

assert_enqueued_jobs(1, only: WorkflowExecutionCleanupJob)
assert_performed_jobs(1, only: WorkflowExecutionCompletionJob)
@workflow_execution_canceling.reload.state.to_sym == :error
end
end
46 changes: 46 additions & 0 deletions test/jobs/workflow_execution_preparation_job_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# frozen_string_literal: true

require 'test_helper'
require 'active_job_test_case'

class WorkflowExecutionPreparationJobTest < ActiveJobTestCase
def setup
@workflow_execution = workflow_executions(:irida_next_example_new)
@workflow_execution_canceling = workflow_executions(:irida_next_example_canceling)
@workflow_execution_completed = workflow_executions(:irida_next_example_completed)
end

def teardown
# reset connections after each test to clear cache
Faraday.default_connection = nil
end

test 'successful job execution' do
perform_enqueued_jobs(only: WorkflowExecutionPreparationJob) do
WorkflowExecutionPreparationJob.perform_later(@workflow_execution)
end

assert_enqueued_jobs(1, only: WorkflowExecutionSubmissionJob)
assert_performed_jobs(1, only: WorkflowExecutionPreparationJob)
@workflow_execution.reload.state.to_sym == :prepared
end

test 'successful execution with early exit due to user canceling job' do
perform_enqueued_jobs(only: WorkflowExecutionPreparationJob) do
WorkflowExecutionPreparationJob.perform_later(@workflow_execution_canceling)
end

assert_enqueued_jobs(0)
@workflow_execution_canceling.reload.state.to_sym == :canceling
end

test 'successful invalid execution' do
perform_enqueued_jobs(only: WorkflowExecutionPreparationJob) do
WorkflowExecutionPreparationJob.perform_later(@workflow_execution_completed)
end

assert_enqueued_jobs(1, only: WorkflowExecutionCleanupJob)
assert_performed_jobs(1, only: WorkflowExecutionPreparationJob)
@workflow_execution_completed.reload.state.to_sym == :error
end
end
2 changes: 1 addition & 1 deletion test/services/workflow_executions/cleanup_service_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def setup # rubocop:disable Metrics/MethodLength
@unrelated_file_blob.download
end

assert ret_value == false
assert ret_value.nil?
end

test 'do not clean if blob_run_directory is nil' do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def setup
assert @workflow_execution_non_executable.initial?

assert_no_difference -> { ActiveStorage::Attachment.count } do
WorkflowExecutions::PreparationService.new(@workflow_execution_non_executable, @user, {}).execute
result = WorkflowExecutions::PreparationService.new(@workflow_execution_non_executable, @user, {}).execute
assert_not result
end

assert_equal 'error', @workflow_execution_non_executable.state
Expand Down