From eaba25c1bc050cadae7c5d9cdea8112f8aab5adf Mon Sep 17 00:00:00 2001 From: Jeffrey Thiessen Date: Wed, 27 Aug 2025 19:51:32 +0000 Subject: [PATCH 1/3] first cut, code flow done --- app/controllers/concerns/workflow_execution_actions.rb | 4 ++-- app/jobs/workflow_execution_cancelation_job.rb | 8 +++++++- app/jobs/workflow_execution_cleanup_job.rb | 4 ++-- app/jobs/workflow_execution_completion_job.rb | 7 ++++++- app/jobs/workflow_execution_job.rb | 6 ++++++ app/jobs/workflow_execution_preparation_job.rb | 6 +----- app/jobs/workflow_execution_status_job.rb | 8 ++++++++ app/jobs/workflow_execution_submission_job.rb | 6 +++++- app/services/workflow_executions/cancelation_service.rb | 5 +---- app/services/workflow_executions/cleanup_service.rb | 2 +- app/services/workflow_executions/completion_service.rb | 5 +---- app/services/workflow_executions/preparation_service.rb | 2 +- app/services/workflow_executions/status_service.rb | 2 +- app/services/workflow_executions/submission_service.rb | 2 +- config/locales/en.yml | 1 + config/locales/fr.yml | 1 + 16 files changed, 45 insertions(+), 24 deletions(-) diff --git a/app/controllers/concerns/workflow_execution_actions.rb b/app/controllers/concerns/workflow_execution_actions.rb index d0285e4c61..b26f2cfbce 100644 --- a/app/controllers/concerns/workflow_execution_actions.rb +++ b/app/controllers/concerns/workflow_execution_actions.rb @@ -99,11 +99,11 @@ def destroy # rubocop:disable Metrics/MethodLength end def cancel # rubocop:disable Metrics/MethodLength - WorkflowExecutions::CancelService.new(@workflow_execution, current_user).execute + result = WorkflowExecutions::CancelService.new(@workflow_execution, current_user).execute respond_to do |format| format.turbo_stream do - if @workflow_execution.canceled? || @workflow_execution.canceling? + if result && (@workflow_execution.canceled? || @workflow_execution.canceling?) render status: :ok, locals: { type: 'success', message: t('concerns.workflow_execution_actions.cancel.success', diff --git a/app/jobs/workflow_execution_cancelation_job.rb b/app/jobs/workflow_execution_cancelation_job.rb index cf864529fa..0faf3222fd 100644 --- a/app/jobs/workflow_execution_cancelation_job.rb +++ b/app/jobs/workflow_execution_cancelation_job.rb @@ -43,6 +43,12 @@ def perform(workflow_execution, user) end wes_connection = Integrations::Ga4ghWesApi::V1::ApiConnection.new.conn - WorkflowExecutions::CancelationService.new(workflow_execution, wes_connection, user).execute + result = WorkflowExecutions::CancelationService.new(workflow_execution, wes_connection, user).execute + + if result + WorkflowExecutionCleanupJob.perform_later(workflow_execution) + else + handle_unable_to_process_job(workflow_execution, self.class.name) unless result + end end end diff --git a/app/jobs/workflow_execution_cleanup_job.rb b/app/jobs/workflow_execution_cleanup_job.rb index 354774423a..631ac7423c 100644 --- a/app/jobs/workflow_execution_cleanup_job.rb +++ b/app/jobs/workflow_execution_cleanup_job.rb @@ -14,7 +14,7 @@ def perform(workflow_execution) workflow_execution.canceled? || workflow_execution.error? - # TODO: early return from cleanup service unhandled - workflow_execution = WorkflowExecutions::CleanupService.new(workflow_execution).execute # rubocop:disable Lint/UselessAssignment + # Final stage of WorkflowExecution life cycle. No further work required. + WorkflowExecutions::CleanupService.new(workflow_execution).execute end end diff --git a/app/jobs/workflow_execution_completion_job.rb b/app/jobs/workflow_execution_completion_job.rb index 44a16e761f..ed3c1a43ef 100644 --- a/app/jobs/workflow_execution_completion_job.rb +++ b/app/jobs/workflow_execution_completion_job.rb @@ -11,7 +11,12 @@ def perform(workflow_execution) return handle_error_state_and_clean(workflow_execution) end - WorkflowExecutions::CompletionService.new(workflow_execution).execute + result = WorkflowExecutions::CompletionService.new(workflow_execution).execute + if result + WorkflowExecutionCleanupJob.perform_later(workflow_execution) + else + handle_unable_to_process_job(workflow_execution, self.class.name) + end # TODO: this job doesn't have any tests. Only the service has tests. # Tests should be added when job queuing from CompletionService is refactored into this job. diff --git a/app/jobs/workflow_execution_job.rb b/app/jobs/workflow_execution_job.rb index 4be8840ed4..762e563e4a 100644 --- a/app/jobs/workflow_execution_job.rb +++ b/app/jobs/workflow_execution_job.rb @@ -23,4 +23,10 @@ def handle_error_state_and_clean(workflow_execution) workflow_execution.update_attribute('state', :error) # rubocop:disable Rails/SkipsModelValidations WorkflowExecutionCleanupJob.perform_later(workflow_execution) end + + def handle_unable_to_process_job(workflow_execution, job_name) + workflow_execution.errors.add(:base, + I18n.t('activerecord.errors.models.workflow_execution.invalid_job_state', job_name:)) + handle_error_state_and_clean(workflow_execution) + end end diff --git a/app/jobs/workflow_execution_preparation_job.rb b/app/jobs/workflow_execution_preparation_job.rb index aa1679ba51..08269b4c02 100644 --- a/app/jobs/workflow_execution_preparation_job.rb +++ b/app/jobs/workflow_execution_preparation_job.rb @@ -14,16 +14,12 @@ def perform(workflow_execution) return handle_error_state_and_clean(workflow_execution) end - # TODO: the service returning false is actually used correctly here - # When the other jobs/services early returns are refactored this should also be included. result = WorkflowExecutions::PreparationService.new(workflow_execution).execute if result WorkflowExecutionSubmissionJob.perform_later(workflow_execution) else - @workflow_execution.state = :error - @workflow_execution.cleaned = true - @workflow_execution.save + handle_unable_to_process_job(workflow_execution, self.class.name) end # TODO: this job doesn't have any tests. Only the service has tests. diff --git a/app/jobs/workflow_execution_status_job.rb b/app/jobs/workflow_execution_status_job.rb index 2d32827aea..9d73a187e0 100644 --- a/app/jobs/workflow_execution_status_job.rb +++ b/app/jobs/workflow_execution_status_job.rb @@ -32,6 +32,14 @@ def perform(workflow_execution) wes_connection = Integrations::Ga4ghWesApi::V1::ApiConnection.new.conn workflow_execution = WorkflowExecutions::StatusService.new(workflow_execution, wes_connection).execute + if workflow_execution + queue_next_job(workflow_execution) + else + handle_unable_to_process_job(workflow_execution, self.class.name) + end + end + + def queue_next_job(workflow_execution) case workflow_execution.state.to_sym when :canceled, :error WorkflowExecutionCleanupJob.perform_later(workflow_execution) diff --git a/app/jobs/workflow_execution_submission_job.rb b/app/jobs/workflow_execution_submission_job.rb index d68b63c01a..7459529959 100644 --- a/app/jobs/workflow_execution_submission_job.rb +++ b/app/jobs/workflow_execution_submission_job.rb @@ -32,6 +32,10 @@ def perform(workflow_execution) wes_connection = Integrations::Ga4ghWesApi::V1::ApiConnection.new.conn workflow_execution = WorkflowExecutions::SubmissionService.new(workflow_execution, wes_connection).execute - WorkflowExecutionStatusJob.set(wait_until: 30.seconds.from_now).perform_later(workflow_execution) + if workflow_execution + WorkflowExecutionStatusJob.set(wait_until: 30.seconds.from_now).perform_later(workflow_execution) + else + handle_unable_to_process_job(workflow_execution, self.class.name) + end end end diff --git a/app/services/workflow_executions/cancelation_service.rb b/app/services/workflow_executions/cancelation_service.rb index 84458f5c98..2eac370e27 100644 --- a/app/services/workflow_executions/cancelation_service.rb +++ b/app/services/workflow_executions/cancelation_service.rb @@ -10,7 +10,7 @@ def initialize(workflow_execution, wes_connection, user = nil, params = {}) end def execute - return false unless @workflow_execution.canceling? # TODO: returning false needs rework + return false unless @workflow_execution.canceling? @wes_client.cancel_run(@workflow_execution.run_id) @@ -19,9 +19,6 @@ def execute @workflow_execution.save - # TODO: job queuing should be handled by the job that called this service - WorkflowExecutionCleanupJob.perform_later(@workflow_execution) - @workflow_execution end end diff --git a/app/services/workflow_executions/cleanup_service.rb b/app/services/workflow_executions/cleanup_service.rb index b97c591421..ccf768d5a6 100644 --- a/app/services/workflow_executions/cleanup_service.rb +++ b/app/services/workflow_executions/cleanup_service.rb @@ -9,7 +9,7 @@ def initialize(workflow_execution, user = nil, params = {}) end def execute - return false if @workflow_execution.nil? || @workflow_execution.cleaned? # TODO: returning false needs rework + return if @workflow_execution.nil? || @workflow_execution.cleaned? # no further work needed # This check is for safety as passing nil or an empty string into deleted_prefixed will delete all blobs if @workflow_execution.blob_run_directory.present? diff --git a/app/services/workflow_executions/completion_service.rb b/app/services/workflow_executions/completion_service.rb index 4e0731eec4..e17ef4b5f3 100644 --- a/app/services/workflow_executions/completion_service.rb +++ b/app/services/workflow_executions/completion_service.rb @@ -16,7 +16,7 @@ def initialize(workflow_execution, params = {}) end def execute # rubocop:disable Metrics/MethodLength - return false unless @workflow_execution.completing? # TODO: returning false needs rework + return false unless @workflow_execution.completing? run_output_data = download_decompress_parse_gziped_json("#{@output_base_path}iridanext.output.json.gz") @@ -45,9 +45,6 @@ def execute # rubocop:disable Metrics/MethodLength @workflow_execution.save - # TODO: job queuing should be handled by the job that called this service - WorkflowExecutionCleanupJob.perform_later(@workflow_execution) - @workflow_execution end diff --git a/app/services/workflow_executions/preparation_service.rb b/app/services/workflow_executions/preparation_service.rb index 060461dba5..9c4d0b6b12 100644 --- a/app/services/workflow_executions/preparation_service.rb +++ b/app/services/workflow_executions/preparation_service.rb @@ -18,7 +18,7 @@ def initialize(workflow_execution, user = nil, params = {}) def execute # rubocop:disable Metrics/MethodLength # confirm pipeline found - return false unless validate_pipeline # TODO: returning false needs rework + return false unless validate_pipeline @samplesheet_headers = @pipeline.samplesheet_headers diff --git a/app/services/workflow_executions/status_service.rb b/app/services/workflow_executions/status_service.rb index e981d20e5e..dc645b0f5b 100644 --- a/app/services/workflow_executions/status_service.rb +++ b/app/services/workflow_executions/status_service.rb @@ -11,7 +11,7 @@ def initialize(workflow_execution, wes_connection, user = nil, params = {}) end def execute - return false if @workflow_execution.run_id.nil? # TODO: returning false needs rework + return false if @workflow_execution.run_id.nil? run_status = @wes_client.get_run_status(@workflow_execution.run_id) diff --git a/app/services/workflow_executions/submission_service.rb b/app/services/workflow_executions/submission_service.rb index bfe467cb9e..46f4f801df 100644 --- a/app/services/workflow_executions/submission_service.rb +++ b/app/services/workflow_executions/submission_service.rb @@ -10,7 +10,7 @@ def initialize(workflow_execution, wes_connection, user = nil, params = {}) end def execute - return false unless @workflow_execution.prepared? # TODO: returning false needs rework + return false unless @workflow_execution.prepared? run = @wes_client.run_workflow(**@workflow_execution.as_wes_params) diff --git a/config/locales/en.yml b/config/locales/en.yml index 11505b31a6..04a4992cc6 100644 --- a/config/locales/en.yml +++ b/config/locales/en.yml @@ -143,6 +143,7 @@ en: attributes: name: blank: can't be blank + invalid_job_state: Workflow Execution job is unable to be processed by Job type '{job_name}' invalid_namespace: can only be Group or Project invalid_workflow: unable to find workflow with name '%{workflow_name}' and version '%{workflow_version}' missing_namespace: does not have a namespace diff --git a/config/locales/fr.yml b/config/locales/fr.yml index c88a355ec7..eec8711b7c 100644 --- a/config/locales/fr.yml +++ b/config/locales/fr.yml @@ -143,6 +143,7 @@ fr: attributes: name: blank: Ne peut pas être vide + invalid_job_state: Workflow Execution job is unable to be processed by Job type '{job_name}' invalid_namespace: Ne peut être qu’un groupe ou un projet invalid_workflow: Impossible de trouver le flux de travail avec le nom '%{workflow_name}' et la version '%{workflow_version}' missing_namespace: N’a pas d’espace de noms From 457e51bbf0058788767fa9b28853746dce67d31d Mon Sep 17 00:00:00 2001 From: Jeffrey Thiessen Date: Wed, 27 Aug 2025 20:37:12 +0000 Subject: [PATCH 2/3] fix cleanup service test case --- test/services/workflow_executions/cleanup_service_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/services/workflow_executions/cleanup_service_test.rb b/test/services/workflow_executions/cleanup_service_test.rb index cdd449d4f7..b9952864f4 100644 --- a/test/services/workflow_executions/cleanup_service_test.rb +++ b/test/services/workflow_executions/cleanup_service_test.rb @@ -86,7 +86,7 @@ def setup # rubocop:disable Metrics/MethodLength @unrelated_file_blob.download end - assert ret_value == false + assert ret_value.nil? end test 'do not clean if blob_run_directory is nil' do From be391543d7620970ad619233d1de2bfc10c454c7 Mon Sep 17 00:00:00 2001 From: Jeffrey Thiessen Date: Wed, 27 Aug 2025 23:12:00 +0000 Subject: [PATCH 3/3] add and update tests --- app/jobs/workflow_execution_completion_job.rb | 3 - .../workflow_execution_preparation_job.rb | 3 - .../workflow_execution_completion_job_test.rb | 57 +++++++++++++++++++ ...workflow_execution_preparation_job_test.rb | 46 +++++++++++++++ .../preparation_service_test.rb | 3 +- 5 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 test/jobs/workflow_execution_completion_job_test.rb create mode 100644 test/jobs/workflow_execution_preparation_job_test.rb diff --git a/app/jobs/workflow_execution_completion_job.rb b/app/jobs/workflow_execution_completion_job.rb index ed3c1a43ef..87ce1a968d 100644 --- a/app/jobs/workflow_execution_completion_job.rb +++ b/app/jobs/workflow_execution_completion_job.rb @@ -17,8 +17,5 @@ def perform(workflow_execution) else handle_unable_to_process_job(workflow_execution, self.class.name) end - - # TODO: this job doesn't have any tests. Only the service has tests. - # Tests should be added when job queuing from CompletionService is refactored into this job. end end diff --git a/app/jobs/workflow_execution_preparation_job.rb b/app/jobs/workflow_execution_preparation_job.rb index 08269b4c02..0cf5354257 100644 --- a/app/jobs/workflow_execution_preparation_job.rb +++ b/app/jobs/workflow_execution_preparation_job.rb @@ -21,8 +21,5 @@ def perform(workflow_execution) else handle_unable_to_process_job(workflow_execution, self.class.name) end - - # TODO: this job doesn't have any tests. Only the service has tests. - # Tests should be added when job queuing from CompletionService is refactored into this job. end end diff --git a/test/jobs/workflow_execution_completion_job_test.rb b/test/jobs/workflow_execution_completion_job_test.rb new file mode 100644 index 0000000000..466dae71e9 --- /dev/null +++ b/test/jobs/workflow_execution_completion_job_test.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +require 'test_helper' +require 'test_helpers/blob_test_helpers' +require 'active_job_test_case' + +class WorkflowExecutionCompletionJobTest < ActiveJobTestCase + include BlobTestHelpers + + def setup + @workflow_execution_canceling = workflow_executions(:irida_next_example_canceling) + + # get a new secure token the workflow execution + @workflow_execution_completing = workflow_executions(:irida_next_example_completing_a) + blob_run_directory_a = ActiveStorage::Blob.generate_unique_secure_token + @workflow_execution_completing.blob_run_directory = blob_run_directory_a + @workflow_execution_completing.save + + # create file blobs + @normal_output_json_file_blob = make_and_upload_blob( + filepath: 'test/fixtures/files/blob_outputs/normal/iridanext.output.json', + blob_run_directory: blob_run_directory_a, + gzip: true + ) + @normal_output_summary_file_blob = make_and_upload_blob( + filepath: 'test/fixtures/files/blob_outputs/normal/summary.txt', + blob_run_directory: blob_run_directory_a + ) + end + + def teardown + # reset connections after each test to clear cache + Faraday.default_connection = nil + end + + test 'successful job execution' do + workflow_execution = @workflow_execution_completing + + perform_enqueued_jobs(only: WorkflowExecutionCompletionJob) do + WorkflowExecutionCompletionJob.perform_later(workflow_execution) + end + + assert_enqueued_jobs(1, only: WorkflowExecutionCleanupJob) + assert_performed_jobs(1, only: WorkflowExecutionCompletionJob) + workflow_execution.reload.state.to_sym == :completed + end + + test 'successful invalid execution' do + perform_enqueued_jobs(only: WorkflowExecutionCompletionJob) do + WorkflowExecutionCompletionJob.perform_later(@workflow_execution_canceling) + end + + assert_enqueued_jobs(1, only: WorkflowExecutionCleanupJob) + assert_performed_jobs(1, only: WorkflowExecutionCompletionJob) + @workflow_execution_canceling.reload.state.to_sym == :error + end +end diff --git a/test/jobs/workflow_execution_preparation_job_test.rb b/test/jobs/workflow_execution_preparation_job_test.rb new file mode 100644 index 0000000000..9092427be1 --- /dev/null +++ b/test/jobs/workflow_execution_preparation_job_test.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +require 'test_helper' +require 'active_job_test_case' + +class WorkflowExecutionPreparationJobTest < ActiveJobTestCase + def setup + @workflow_execution = workflow_executions(:irida_next_example_new) + @workflow_execution_canceling = workflow_executions(:irida_next_example_canceling) + @workflow_execution_completed = workflow_executions(:irida_next_example_completed) + end + + def teardown + # reset connections after each test to clear cache + Faraday.default_connection = nil + end + + test 'successful job execution' do + perform_enqueued_jobs(only: WorkflowExecutionPreparationJob) do + WorkflowExecutionPreparationJob.perform_later(@workflow_execution) + end + + assert_enqueued_jobs(1, only: WorkflowExecutionSubmissionJob) + assert_performed_jobs(1, only: WorkflowExecutionPreparationJob) + @workflow_execution.reload.state.to_sym == :prepared + end + + test 'successful execution with early exit due to user canceling job' do + perform_enqueued_jobs(only: WorkflowExecutionPreparationJob) do + WorkflowExecutionPreparationJob.perform_later(@workflow_execution_canceling) + end + + assert_enqueued_jobs(0) + @workflow_execution_canceling.reload.state.to_sym == :canceling + end + + test 'successful invalid execution' do + perform_enqueued_jobs(only: WorkflowExecutionPreparationJob) do + WorkflowExecutionPreparationJob.perform_later(@workflow_execution_completed) + end + + assert_enqueued_jobs(1, only: WorkflowExecutionCleanupJob) + assert_performed_jobs(1, only: WorkflowExecutionPreparationJob) + @workflow_execution_completed.reload.state.to_sym == :error + end +end diff --git a/test/services/workflow_executions/preparation_service_test.rb b/test/services/workflow_executions/preparation_service_test.rb index c18ce38122..bd8e6c0948 100644 --- a/test/services/workflow_executions/preparation_service_test.rb +++ b/test/services/workflow_executions/preparation_service_test.rb @@ -56,7 +56,8 @@ def setup assert @workflow_execution_non_executable.initial? assert_no_difference -> { ActiveStorage::Attachment.count } do - WorkflowExecutions::PreparationService.new(@workflow_execution_non_executable, @user, {}).execute + result = WorkflowExecutions::PreparationService.new(@workflow_execution_non_executable, @user, {}).execute + assert_not result end assert_equal 'error', @workflow_execution_non_executable.state