From 8e7c73d2565e2fb65debc83b8b3c459112b9f27b Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 21 May 2025 11:48:37 +0200 Subject: [PATCH 01/21] Improve performance of job cache query by turning the correlated subquery that checks for deleted job outputs into an outerjoin. Brings the query time down from 374937.263 to 19373.252, so roughly a 20-fold improvement. Still a little slow but more managable. The remainder can likely be improved by an additional compound index SQL Before: ``` SELECT job.id, job_to_input_dataset_1.dataset_id, job_to_input_dataset_2.dataset_id AS dataset_id_1 FROM job JOIN history ON job.history_id = history.id JOIN job_parameter AS job_parameter_1 ON job.id = job_parameter_1.job_id JOIN job_parameter AS job_parameter_2 ON job.id = job_parameter_2.job_id JOIN job_parameter AS job_parameter_3 ON job.id = job_parameter_3.job_id JOIN job_parameter AS job_parameter_4 ON job.id = job_parameter_4.job_id JOIN job_to_input_dataset AS job_to_input_dataset_1 ON job_to_input_dataset_1.job_id = job.id JOIN history_dataset_association AS history_dataset_association_1 ON job_to_input_dataset_1.dataset_id = history_dataset_association_1.id JOIN history_dataset_association AS history_dataset_association_2 ON history_dataset_association_2.dataset_id = history_dataset_association_1.dataset_id JOIN job_to_input_dataset AS job_to_input_dataset_2 ON job_to_input_dataset_2.job_id = job.id JOIN history_dataset_association AS history_dataset_association_3 ON job_to_input_dataset_2.dataset_id = history_dataset_association_3.id JOIN history_dataset_association AS history_dataset_association_4 ON history_dataset_association_4.dataset_id = history_dataset_association_3.dataset_id WHERE job.tool_id = 'toolshed.g2.bx.psu.edu/repos/iuc/hisat2/hisat2/2.2.1+galaxy1' AND ( job.user_id = 392010 OR history.published = true ) AND job.copied_from_job_id IS NULL AND job.tool_version = '2.2.1+galaxy1' AND job.state IN ('ok') AND ( EXISTS ( SELECT history_dataset_collection_association.id FROM history_dataset_collection_association, job_to_output_dataset_collection WHERE job.id = job_to_output_dataset_collection.job_id AND history_dataset_collection_association.id = job_to_output_dataset_collection.dataset_collection_id AND history_dataset_collection_association.deleted = true ) ) = false AND ( EXISTS ( SELECT history_dataset_association.id FROM history_dataset_association, job_to_output_dataset WHERE job.id = job_to_output_dataset.job_id AND history_dataset_association.id = job_to_output_dataset.dataset_id AND history_dataset_association.deleted = true ) ) = false AND job.id = job_parameter_1.job_id AND job_parameter_1.name = 'reference_genome' AND job_parameter_1.value LIKE '{""__current_case__"": 1, ""history_item"": {""values"": [{""id"": %, ""src"": ""hda""}]}, ""source"": ""history""}' AND job.id = job_parameter_2.job_id AND job_parameter_2.name = 'library' AND job_parameter_2.value LIKE '{""__current_case__"": 0, ""input_1"": {""values"": [{""id"": %, ""src"": ""hda""}]}, ""rna_strandness"": """", ""type"": ""single""}' AND job.id = job_parameter_3.job_id AND job_parameter_3.name = 'sum' AND job_parameter_3.value = '{""new_summary"": false, ""summary_file"": false}' AND job.id = job_parameter_4.job_id AND job_parameter_4.name = 'adv' AND job_parameter_4.value = '{""alignment_options"": {""__current_case__"": 0, ""alignment_options_selector"": ""defaults""}, ""input_options"": {""__current_case__"": 0, ""input_options_selector"": ""defaults""}, ""other_options"": {""__current_case__"": 0, ""other_options_selector"": ""defaults""}, ""output_options"": {""__current_case__"": 0, ""output_options_selector"": ""defaults""}, ""reporting_options"": {""__current_case__"": 0, ""reporting_options_selector"": ""defaults""}, ""sam_options"": {""__current_case__"": 0, ""sam_options_selector"": ""defaults""}, ""scoring_options"": {""__current_case__"": 0, ""scoring_options_selector"": ""defaults""}, ""spliced_options"": {""__current_case__"": 0, ""spliced_options_selector"": ""defaults""}}' AND job_to_input_dataset_1.name IN ('reference_genome|history_item', 'history_item') AND history_dataset_association_2.id = 152775960 AND ( ( job_to_input_dataset_1.dataset_version IN (0, history_dataset_association_1.version) OR history_dataset_association_1.update_time < job.create_time ) AND history_dataset_association_1.extension = history_dataset_association_2.extension AND history_dataset_association_1.name = history_dataset_association_2.name OR history_dataset_association_1.id IN ( SELECT history_dataset_association.id FROM history_dataset_association, history_dataset_association_history AS history_dataset_association_history_1 WHERE history_dataset_association.id = history_dataset_association_history_1.history_dataset_association_id AND history_dataset_association_history_1.name = history_dataset_association_2.name AND history_dataset_association_history_1.extension = history_dataset_association_2.extension AND job_to_input_dataset_1.dataset_version = history_dataset_association_history_1.version AND history_dataset_association_history_1.metadata = history_dataset_association_2.metadata ) ) AND ( history_dataset_association_1.deleted = false OR history_dataset_association_2.deleted = false ) AND job_to_input_dataset_2.name IN ('input_1', 'library|input_1') AND history_dataset_association_4.id = 152726579 AND ( ( job_to_input_dataset_2.dataset_version IN (0, history_dataset_association_3.version) OR history_dataset_association_3.update_time < job.create_time ) AND history_dataset_association_3.extension = history_dataset_association_4.extension AND history_dataset_association_3.name = history_dataset_association_4.name OR history_dataset_association_3.id IN ( SELECT history_dataset_association.id FROM history_dataset_association, history_dataset_association_history AS history_dataset_association_history_2 WHERE history_dataset_association.id = history_dataset_association_history_2.history_dataset_association_id AND history_dataset_association_history_2.name = history_dataset_association_4.name AND history_dataset_association_history_2.extension = history_dataset_association_4.extension AND job_to_input_dataset_2.dataset_version = history_dataset_association_history_2.version AND history_dataset_association_history_2.metadata = history_dataset_association_4.metadata ) ) AND ( history_dataset_association_3.deleted = false OR history_dataset_association_4.deleted = false ) GROUP BY job.id, job_to_input_dataset_1.dataset_id, job_to_input_dataset_2.dataset_id ORDER BY job.id DESC; ``` after (changed the aliases manually): ``` EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS, FORMAT JSON) SELECT job.id, job_to_input_dataset_1.dataset_id, job_to_input_dataset_2.dataset_id AS dataset_id_1 FROM job JOIN history ON job.history_id = history.id LEFT OUTER JOIN job_to_output_dataset_collection AS job_to_output_dataset_collection_1 ON job.id = job_to_output_dataset_collection_1.job_id LEFT OUTER JOIN history_dataset_collection_association AS history_dataset_collection_association_1_deleted ON history_dataset_collection_association_1_deleted.id = job_to_output_dataset_collection_1.dataset_collection_id AND history_dataset_collection_association_1_deleted.deleted = true LEFT OUTER JOIN job_to_output_dataset AS job_to_output_dataset_1 ON job.id = job_to_output_dataset_1.job_id LEFT OUTER JOIN history_dataset_association AS history_dataset_association_1_deleted ON history_dataset_association_1_deleted.id = job_to_output_dataset_1.dataset_id AND history_dataset_association_1_deleted.deleted = true JOIN job_parameter AS job_parameter_1 ON job.id = job_parameter_1.job_id JOIN job_parameter AS job_parameter_2 ON job.id = job_parameter_2.job_id JOIN job_parameter AS job_parameter_3 ON job.id = job_parameter_3.job_id JOIN job_parameter AS job_parameter_4 ON job.id = job_parameter_4.job_id JOIN job_to_input_dataset AS job_to_input_dataset_1 ON job_to_input_dataset_1.job_id = job.id JOIN history_dataset_association AS history_dataset_association_1 ON job_to_input_dataset_1.dataset_id = history_dataset_association_1.id JOIN history_dataset_association AS history_dataset_association_2 ON history_dataset_association_2.dataset_id = history_dataset_association_1.dataset_id JOIN job_to_input_dataset AS job_to_input_dataset_2 ON job_to_input_dataset_2.job_id = job.id JOIN history_dataset_association AS history_dataset_association_3 ON job_to_input_dataset_2.dataset_id = history_dataset_association_3.id JOIN history_dataset_association AS history_dataset_association_4 ON history_dataset_association_4.dataset_id = history_dataset_association_3.dataset_id WHERE job.tool_id = 'toolshed.g2.bx.psu.edu/repos/iuc/hisat2/hisat2/2.2.1+galaxy1' AND ( job.user_id = 392010 OR history.published = true ) AND job.copied_from_job_id IS NULL AND job.tool_version = '2.2.1+galaxy1' AND job.state IN ('ok') AND job_to_output_dataset_collection_1.job_id IS NULL AND job_to_output_dataset_1.job_id IS NULL AND job.id = job_parameter_1.job_id AND job_parameter_1.name = 'reference_genome' AND job_parameter_1.value LIKE '{""__current_case__"": 1, ""history_item"": {""values"": [{""id"": %, ""src"": ""hda""}]}, ""source"": ""history""}' AND job.id = job_parameter_2.job_id AND job_parameter_2.name = 'library' AND job_parameter_2.value LIKE '{""__current_case__"": 0, ""input_1"": {""values"": [{""id"": %, ""src"": ""hda""}]}, ""rna_strandness"": """", ""type"": ""single""}' AND job.id = job_parameter_3.job_id AND job_parameter_3.name = 'sum' AND job_parameter_3.value = '{""new_summary"": false, ""summary_file"": false}' AND job.id = job_parameter_4.job_id AND job_parameter_4.name = 'adv' AND job_parameter_4.value = '{""alignment_options"": {""__current_case__"": 0, ""alignment_options_selector"": ""defaults""}, ""input_options"": {""__current_case__"": 0, ""input_options_selector"": ""defaults""}, ""other_options"": {""__current_case__"": 0, ""other_options_selector"": ""defaults""}, ""output_options"": {""__current_case__"": 0, ""output_options_selector"": ""defaults""}, ""reporting_options"": {""__current_case__"": 0, ""reporting_options_selector"": ""defaults""}, ""sam_options"": {""__current_case__"": 0, ""sam_options_selector"": ""defaults""}, ""scoring_options"": {""__current_case__"": 0, ""scoring_options_selector"": ""defaults""}, ""spliced_options"": {""__current_case__"": 0, ""spliced_options_selector"": ""defaults""}}' AND job_to_input_dataset_1.name IN ('reference_genome|history_item', 'history_item') AND history_dataset_association_2.id = 152775960 AND ( ( job_to_input_dataset_1.dataset_version IN (0, history_dataset_association_1.version) OR history_dataset_association_1.update_time < job.create_time ) AND history_dataset_association_1.extension = history_dataset_association_2.extension AND history_dataset_association_1.name = history_dataset_association_2.name OR history_dataset_association_1.id IN ( SELECT history_dataset_association.id FROM history_dataset_association, history_dataset_association_history AS history_dataset_association_history_1 WHERE history_dataset_association.id = history_dataset_association_history_1.history_dataset_association_id AND history_dataset_association_history_1.name = history_dataset_association_2.name AND history_dataset_association_history_1.extension = history_dataset_association_2.extension AND job_to_input_dataset_1.dataset_version = history_dataset_association_history_1.version AND history_dataset_association_history_1.metadata = history_dataset_association_2.metadata ) ) AND ( history_dataset_association_1.deleted = false OR history_dataset_association_2.deleted = false ) AND job_to_input_dataset_2.name IN ('input_1', 'library|input_1') AND history_dataset_association_4.id = 152726579 AND ( ( job_to_input_dataset_2.dataset_version IN (0, history_dataset_association_3.version) OR history_dataset_association_3.update_time < job.create_time ) AND history_dataset_association_3.extension = history_dataset_association_4.extension AND history_dataset_association_3.name = history_dataset_association_4.name OR history_dataset_association_3.id IN ( SELECT history_dataset_association.id FROM history_dataset_association, history_dataset_association_history AS history_dataset_association_history_2 WHERE history_dataset_association.id = history_dataset_association_history_2.history_dataset_association_id AND history_dataset_association_history_2.name = history_dataset_association_4.name AND history_dataset_association_history_2.extension = history_dataset_association_4.extension AND job_to_input_dataset_2.dataset_version = history_dataset_association_history_2.version AND history_dataset_association_history_2.metadata = history_dataset_association_4.metadata ) ) AND ( history_dataset_association_3.deleted = false OR history_dataset_association_4.deleted = false ) GROUP BY job.id, job_to_input_dataset_1.dataset_id, job_to_input_dataset_2.dataset_id ORDER BY job.id DESC; ``` --- lib/galaxy/managers/jobs.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index b173bbcd14ec..510192c6236c 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -589,14 +589,7 @@ def _build_job_query(self, tool_id: str, user_id: int, tool_version: Optional[st if wildcard_param_dump.get("__when_value__") is False: job_states = {Job.states.SKIPPED} stmt = stmt.where(Job.state.in_(job_states)) - - # exclude jobs with deleted outputs - stmt = stmt.where( - and_( - model.Job.any_output_dataset_collection_instances_deleted == false(), - model.Job.any_output_dataset_deleted == false(), - ) - ) + stmt = self._exclude_jobs_with_deleted_outputs(stmt) for k, v in wildcard_param_dump.items(): if v == {"__class__": "RuntimeValue"}: @@ -633,6 +626,30 @@ def _build_job_query(self, tool_id: str, user_id: int, tool_version: Optional[st return stmt + def _exclude_jobs_with_deleted_outputs(self, stmt): + jtodca = aliased(model.JobToOutputDatasetCollectionAssociation) + deleted_output_hdca = aliased(model.HistoryDatasetCollectionAssociation) + jtoda = aliased(model.JobToOutputDatasetAssociation) + deleted_output_hda = aliased(model.HistoryDatasetAssociation) + return ( + stmt.outerjoin(jtodca, model.Job.id == jtodca.job_id) + .outerjoin( + deleted_output_hdca, + and_(deleted_output_hdca.id == jtodca.dataset_collection_id, deleted_output_hdca.deleted == true()), + ) + .outerjoin(jtoda, model.Job.id == jtoda.job_id) + .outerjoin( + deleted_output_hda, + and_(deleted_output_hda.id == jtoda.dataset_id, deleted_output_hda.deleted == true()), + ) + .where( + and_( + jtodca.job_id.is_(None), # no matching deleted collection was found. + jtoda.job_id.is_(None), # no matching deleted dataset was found. + ) + ) + ) + def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, require_name_match=True): a = aliased(model.JobToInputDatasetAssociation) b = aliased(model.HistoryDatasetAssociation) From 015a01f292c1a0009f36456a51e19835f4b2e464 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 21 May 2025 14:47:05 +0200 Subject: [PATCH 02/21] Fix where clause --- lib/galaxy/managers/jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 510192c6236c..57a5a8af0ab1 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -644,8 +644,8 @@ def _exclude_jobs_with_deleted_outputs(self, stmt): ) .where( and_( - jtodca.job_id.is_(None), # no matching deleted collection was found. - jtoda.job_id.is_(None), # no matching deleted dataset was found. + deleted_output_hdca.id.is_(None), # no matching deleted collection was found. + deleted_output_hda.id.is_(None), # no matching deleted dataset was found. ) ) ) From 904a23e922e059f485ceb878a8e1083bd4a687d5 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 21 May 2025 17:53:05 +0200 Subject: [PATCH 03/21] Restore not exists logic but runs against inner subquery --- lib/galaxy/managers/jobs.py | 75 +++++++++++++++++++++--------------- lib/galaxy/model/__init__.py | 27 ------------- 2 files changed, 44 insertions(+), 58 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 57a5a8af0ab1..76c0f0adb771 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -24,6 +24,7 @@ ) from sqlalchemy import ( and_, + exists, false, func, null, @@ -467,7 +468,7 @@ def replace_dataset_ids(path, key, value): for k, input_list in input_data.items(): # k will be matched against the JobParameter.name column. This can be prefixed depending on whethter # the input is in a repeat, or not (section and conditional) - k = {k, k.split("|")[-1]} + k = (k, k.split("|")[-1]) for type_values in input_list: t = type_values["src"] v = type_values["id"] @@ -488,7 +489,8 @@ def replace_dataset_ids(path, key, value): log.error("Unknown input data type %s", t) return None - stmt = stmt.where(*data_conditions).group_by(model.Job.id, *used_ids).order_by(model.Job.id.desc()) + stmt = stmt.where(*data_conditions).group_by(model.Job.id, *used_ids) + stmt = self._exclude_jobs_with_deleted_outputs(stmt) for job in self.sa_session.execute(stmt): # We found a job that is equal in terms of tool_id, user, state and input datasets, @@ -557,7 +559,7 @@ def replace_dataset_ids(path, key, value): def _build_job_query(self, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump): """Build subquery that selects a job with correct job parameters.""" stmt = ( - select(model.Job.id) + select(model.Job.id.label("job_id")) .join(model.History, model.Job.history_id == model.History.id) .where( and_( @@ -589,7 +591,6 @@ def _build_job_query(self, tool_id: str, user_id: int, tool_version: Optional[st if wildcard_param_dump.get("__when_value__") is False: job_states = {Job.states.SKIPPED} stmt = stmt.where(Job.state.in_(job_states)) - stmt = self._exclude_jobs_with_deleted_outputs(stmt) for k, v in wildcard_param_dump.items(): if v == {"__class__": "RuntimeValue"}: @@ -627,37 +628,46 @@ def _build_job_query(self, tool_id: str, user_id: int, tool_version: Optional[st return stmt def _exclude_jobs_with_deleted_outputs(self, stmt): - jtodca = aliased(model.JobToOutputDatasetCollectionAssociation) - deleted_output_hdca = aliased(model.HistoryDatasetCollectionAssociation) - jtoda = aliased(model.JobToOutputDatasetAssociation) - deleted_output_hda = aliased(model.HistoryDatasetAssociation) - return ( - stmt.outerjoin(jtodca, model.Job.id == jtodca.job_id) - .outerjoin( - deleted_output_hdca, - and_(deleted_output_hdca.id == jtodca.dataset_collection_id, deleted_output_hdca.deleted == true()), - ) - .outerjoin(jtoda, model.Job.id == jtoda.job_id) - .outerjoin( - deleted_output_hda, - and_(deleted_output_hda.id == jtoda.dataset_id, deleted_output_hda.deleted == true()), + subquery_alias = stmt.subquery("filtered_jobs_subquery") + outer_select_columns = [subquery_alias.c[col.name] for col in stmt.selected_columns] + outer_stmt = select(*outer_select_columns).select_from(subquery_alias) + job_id_from_subquery = subquery_alias.c.job_id + deleted_collection_exists = exists().where( + and_( + model.JobToOutputDatasetCollectionAssociation.job_id == job_id_from_subquery, + model.JobToOutputDatasetCollectionAssociation.dataset_collection_id + == model.HistoryDatasetCollectionAssociation.id, + model.HistoryDatasetCollectionAssociation.deleted == true(), ) - .where( - and_( - deleted_output_hdca.id.is_(None), # no matching deleted collection was found. - deleted_output_hda.id.is_(None), # no matching deleted dataset was found. - ) + ) + + # Subquery for deleted output datasets + deleted_dataset_exists = exists().where( + and_( + model.JobToOutputDatasetAssociation.job_id == job_id_from_subquery, + model.JobToOutputDatasetAssociation.dataset_id == model.HistoryDatasetAssociation.id, + model.HistoryDatasetAssociation.deleted == true(), ) ) + # Exclude jobs where a deleted collection OR a deleted dataset exists + outer_stmt = outer_stmt.where( + and_( + ~deleted_collection_exists, # NOT EXISTS deleted collection + ~deleted_dataset_exists, # NOT EXISTS deleted dataset + ) + ).order_by(job_id_from_subquery.desc()) + return outer_stmt + def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, require_name_match=True): a = aliased(model.JobToInputDatasetAssociation) b = aliased(model.HistoryDatasetAssociation) c = aliased(model.HistoryDatasetAssociation) d = aliased(model.JobParameter) e = aliased(model.HistoryDatasetAssociationHistory) - stmt = stmt.add_columns(a.dataset_id) - used_ids.append(a.dataset_id) + labeled_col = a.dataset_id.label(f"{k[0]}_{v}") + stmt = stmt.add_columns(labeled_col) + used_ids.append(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) hda_stmt = select(model.HistoryDatasetAssociation.id).where( model.HistoryDatasetAssociation.id == e.history_dataset_association_id @@ -712,10 +722,11 @@ def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v): a = aliased(model.JobToInputLibraryDatasetAssociation) - stmt = stmt.add_columns(a.ldda_id) + labeled_col = a.ldda_id.label(f"{k[0]}_{v}") + stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) data_conditions.append(and_(a.name.in_(k), a.ldda_id == v)) - used_ids.append(a.ldda_id) + used_ids.append(labeled_col) return stmt def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_name_match=True): @@ -738,7 +749,8 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na hda_right = aliased(model.HistoryDatasetAssociation) # Start joins from job → input HDCA → its collection → DCE - stmt = stmt.add_columns(a.dataset_collection_id) + labeled_col = a.dataset_collection_id.label(f"{k[0]}_{v}") + stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) stmt = stmt.join(hdca_input, hdca_input.id == a.dataset_collection_id) stmt = stmt.join(root_collection, root_collection.id == hdca_input.collection_id) @@ -772,7 +784,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na ) ) - used_ids.append(a.dataset_collection_id) # input-side HDCA + used_ids.append(labeled_col) # input-side HDCA return stmt def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): @@ -788,7 +800,8 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): hda_right = aliased(model.HistoryDatasetAssociation) # Base joins - stmt = stmt.add_columns(a.dataset_collection_element_id) + labeled_col = a.dataset_collection_element_id.label(f"{k[0]}_{v}") + stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) stmt = stmt.join(dce_left[0], dce_left[0].id == a.dataset_collection_element_id) stmt = stmt.join(dce_right[0], dce_right[0].id == v) @@ -809,7 +822,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): data_conditions.append(and_(a.name.in_(k), hda_left.dataset_id == hda_right.dataset_id)) - used_ids.append(a.dataset_collection_element_id) + used_ids.append(labeled_col) return stmt diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index c063d8ec3677..f8506ad63b77 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1614,9 +1614,6 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable): back_populates="job", uselist=False ) - any_output_dataset_collection_instances_deleted = None - any_output_dataset_deleted = None - dict_collection_visible_keys = ["id", "state", "exit_code", "update_time", "create_time", "galaxy_version"] dict_element_visible_keys = [ "id", @@ -12082,30 +12079,6 @@ def __repr__(self): # ---------------------------------------------------------------------------------------- # The following statements must not precede the mapped models defined above. -Job.any_output_dataset_collection_instances_deleted = deferred( - column_property( # type:ignore[assignment] - exists(HistoryDatasetCollectionAssociation.id).where( - and_( - Job.id == JobToOutputDatasetCollectionAssociation.job_id, - HistoryDatasetCollectionAssociation.id == JobToOutputDatasetCollectionAssociation.dataset_collection_id, - HistoryDatasetCollectionAssociation.deleted == true(), - ) - ), - ) -) - -Job.any_output_dataset_deleted = deferred( - column_property( # type:ignore[assignment] - exists(HistoryDatasetAssociation.id).where( - and_( - Job.id == JobToOutputDatasetAssociation.job_id, - HistoryDatasetAssociation.table.c.id == JobToOutputDatasetAssociation.dataset_id, - HistoryDatasetAssociation.table.c.deleted == true(), - ) - ), - ) -) - History.average_rating = column_property( # type:ignore[assignment] select(func.avg(HistoryRatingAssociation.rating)) .where(HistoryRatingAssociation.history_id == History.id) From fad9c9198a040c4088b052f23ec5ffce0c65cbc9 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 23 May 2025 11:46:59 +0200 Subject: [PATCH 04/21] Limit jobs based on input datasets in inner subquery This should be roughly equivalent to the CTE in https://gist.github.com/mvdbeek/2d4e235bfd9531de7c87de0f0365ffe6 --- lib/galaxy/managers/jobs.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 76c0f0adb771..b06691888db1 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -454,7 +454,7 @@ def replace_dataset_ids(path, key, value): return key, value return key, value - stmt = self._build_job_query(tool_id, user.id, tool_version, job_state, wildcard_param_dump) + stmt = select(model.Job.id.label("job_id")) data_conditions: List = [] @@ -490,6 +490,7 @@ def replace_dataset_ids(path, key, value): return None stmt = stmt.where(*data_conditions).group_by(model.Job.id, *used_ids) + stmt = self._filter_jobs(stmt, tool_id, user.id, tool_version, job_state, wildcard_param_dump) stmt = self._exclude_jobs_with_deleted_outputs(stmt) for job in self.sa_session.execute(stmt): @@ -556,10 +557,15 @@ def replace_dataset_ids(path, key, value): log.info("No equivalent jobs found %s", search_timer) return None - def _build_job_query(self, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump): + def _filter_jobs( + self, stmt, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump + ): """Build subquery that selects a job with correct job parameters.""" + subquery_alias = stmt.subquery("job_ids_subquery") + outer_select_columns = [subquery_alias.c[col.name] for col in stmt.selected_columns] + stmt = select(*outer_select_columns).select_from(subquery_alias) stmt = ( - select(model.Job.id.label("job_id")) + stmt.join(model.Job, model.Job.id == subquery_alias.c.job_id) .join(model.History, model.Job.history_id == model.History.id) .where( and_( From 283c854b14a8d447dbd97a285cce5fe1b87fd2da Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 23 May 2025 14:59:23 +0200 Subject: [PATCH 05/21] JobParmeter is now always fully prefixed --- lib/galaxy/managers/jobs.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index b06691888db1..d3f78e23fd8e 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -468,7 +468,6 @@ def replace_dataset_ids(path, key, value): for k, input_list in input_data.items(): # k will be matched against the JobParameter.name column. This can be prefixed depending on whethter # the input is in a repeat, or not (section and conditional) - k = (k, k.split("|")[-1]) for type_values in input_list: t = type_values["src"] v = type_values["id"] @@ -671,7 +670,7 @@ def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, c = aliased(model.HistoryDatasetAssociation) d = aliased(model.JobParameter) e = aliased(model.HistoryDatasetAssociationHistory) - labeled_col = a.dataset_id.label(f"{k[0]}_{v}") + labeled_col = a.dataset_id.label(f"{k}_{v}") stmt = stmt.add_columns(labeled_col) used_ids.append(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) @@ -685,7 +684,7 @@ def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, stmt = stmt.join(d) data_conditions.append( and_( - d.name.in_({f"{_}|__identifier__" for _ in k}), + d.name == f"{k}|__identifier__", d.value == json.dumps(identifier), ) ) @@ -705,7 +704,7 @@ def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, ) data_conditions.append( and_( - a.name.in_(k), + a.name == k, c.id == v, # c is the requested job input HDA # We need to make sure that the job we are looking for has been run with identical inputs. # Here we deal with 3 requirements: @@ -728,10 +727,10 @@ def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v): a = aliased(model.JobToInputLibraryDatasetAssociation) - labeled_col = a.ldda_id.label(f"{k[0]}_{v}") + labeled_col = a.ldda_id.label(f"{k}_{v}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) - data_conditions.append(and_(a.name.in_(k), a.ldda_id == v)) + data_conditions.append(and_(a.name == k, a.ldda_id == v)) used_ids.append(labeled_col) return stmt @@ -755,7 +754,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na hda_right = aliased(model.HistoryDatasetAssociation) # Start joins from job → input HDCA → its collection → DCE - labeled_col = a.dataset_collection_id.label(f"{k[0]}_{v}") + labeled_col = a.dataset_collection_id.label(f"{k}_{v}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) stmt = stmt.join(hdca_input, hdca_input.id == a.dataset_collection_id) @@ -785,7 +784,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na data_conditions.append( and_( - a.name.in_(k), + a.name == k, hda_left.dataset_id == hda_right.dataset_id, ) ) @@ -806,7 +805,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): hda_right = aliased(model.HistoryDatasetAssociation) # Base joins - labeled_col = a.dataset_collection_element_id.label(f"{k[0]}_{v}") + labeled_col = a.dataset_collection_element_id.label(f"{k}_{v}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) stmt = stmt.join(dce_left[0], dce_left[0].id == a.dataset_collection_element_id) @@ -826,7 +825,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): stmt = stmt.outerjoin(hda_left, hda_left.id == leaf_left.hda_id) stmt = stmt.outerjoin(hda_right, hda_right.id == leaf_right.hda_id) - data_conditions.append(and_(a.name.in_(k), hda_left.dataset_id == hda_right.dataset_id)) + data_conditions.append(and_(a.name == k, hda_left.dataset_id == hda_right.dataset_id)) used_ids.append(labeled_col) return stmt From a43b4f96deeebc753311fa29728555873f3a30c9 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 23 May 2025 15:05:07 +0200 Subject: [PATCH 06/21] Use inner join for leaf HDAs inside HDCA / DCE search --- lib/galaxy/managers/jobs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index d3f78e23fd8e..4386d614868e 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -779,8 +779,8 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na # Compare leaf-level HDAs leaf_left = dce_left[-1] leaf_right = dce_right[-1] - stmt = stmt.outerjoin(hda_left, hda_left.id == leaf_left.hda_id) - stmt = stmt.outerjoin(hda_right, hda_right.id == leaf_right.hda_id) + stmt = stmt.join(hda_left, hda_left.id == leaf_left.hda_id) + stmt = stmt.join(hda_right, hda_right.id == leaf_right.hda_id) data_conditions.append( and_( @@ -822,8 +822,8 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): # Compare dataset_ids at the leaf level leaf_left = dce_left[-1] leaf_right = dce_right[-1] - stmt = stmt.outerjoin(hda_left, hda_left.id == leaf_left.hda_id) - stmt = stmt.outerjoin(hda_right, hda_right.id == leaf_right.hda_id) + stmt = stmt.join(hda_left, hda_left.id == leaf_left.hda_id) + stmt = stmt.join(hda_right, hda_right.id == leaf_right.hda_id) data_conditions.append(and_(a.name == k, hda_left.dataset_id == hda_right.dataset_id)) From d22afc80aecc47dff6d91436630add3e11a5cd2f Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 23 May 2025 15:41:26 +0200 Subject: [PATCH 07/21] Improve performance of dataset_id comparisons in job search Previously, the dataset_id comparison was applied late in queries involving nested collections, leading to large intermediate result sets. This change introduces CTEs to pre-filter the target dataset_ids on the right-hand side of the comparison. By using IN (SELECT dataset_id FROM cte_name), the database can prune non-matching rows earlier in the query execution plan, significantly reducing execution time and resource consumption. --- lib/galaxy/managers/jobs.py | 138 ++++++++++++++++++++++++++++-------- 1 file changed, 107 insertions(+), 31 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 4386d614868e..ef43665d618b 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -744,16 +744,57 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na ) depth = collection_type.count(":") if collection_type else 0 - a = aliased(model.JobToInputDatasetCollectionAssociation) - hdca_input = aliased(model.HistoryDatasetCollectionAssociation) - root_collection = aliased(model.DatasetCollection) + # Aliases for the "left" side (input HDCA path) + a = aliased(model.JobToInputDatasetCollectionAssociation, name=f"job_to_input_dataset_collection_1_{k}_{v}") + hdca_input = aliased( + model.HistoryDatasetCollectionAssociation, name=f"history_dataset_collection_association_1_{k}_{v}" + ) + root_collection = aliased(model.DatasetCollection, name=f"dataset_collection_1_{k}_{v}") + dce_left = [ + aliased(model.DatasetCollectionElement, name=f"dataset_collection_element_left_{k}_{v}_{i}") + for i in range(depth + 1) + ] + hda_left = aliased(model.HistoryDatasetAssociation, name=f"history_dataset_association_1_{k}_{v}") + + # Aliases for the "right" side (target HDCA path) in the main query + hdca_target = aliased( + model.HistoryDatasetCollectionAssociation, name=f"history_dataset_collection_association_2_{k}_{v}" + ) + target_collection = aliased(model.DatasetCollection, name=f"dataset_collection_2_{k}_{v}") + dce_right = [ + aliased(model.DatasetCollectionElement, name=f"dataset_collection_element_right_{k}_{v}_{i}") + for i in range(depth + 1) + ] + + # This CTE will get all relevant dataset_ids from the target HDCA (id=v) and its nested elements. + _hdca_target_cte = aliased(model.HistoryDatasetCollectionAssociation, name=f"_hdca_target_cte_{k}_{v}") + _target_collection_cte = aliased(model.DatasetCollection, name=f"_target_collection_cte_{k}_{v}") + _dce_cte = [aliased(model.DatasetCollectionElement, name=f"_dce_cte_{i}") for i in range(depth + 1)] + _hda_cte = aliased(model.HistoryDatasetAssociation, name=f"_hda_cte_{k}_{v}") + + reference_hda_cte_stmt = ( + select(_hda_cte.dataset_id) + .select_from(_hdca_target_cte) + .join(_target_collection_cte, _target_collection_cte.id == _hdca_target_cte.collection_id) + .join(_dce_cte[0], _dce_cte[0].dataset_collection_id == _target_collection_cte.id) + ) + + for i in range(1, depth + 1): + reference_hda_cte_stmt = reference_hda_cte_stmt.join( + _dce_cte[i], _dce_cte[i].dataset_collection_id == _dce_cte[i - 1].child_collection_id + ) - dce_left = [aliased(model.DatasetCollectionElement) for _ in range(depth + 1)] - dce_right = [aliased(model.DatasetCollectionElement) for _ in range(depth + 1)] - hda_left = aliased(model.HistoryDatasetAssociation) - hda_right = aliased(model.HistoryDatasetAssociation) + # Join to the leaf HDA for the right side within the CTE + _leaf_cte = _dce_cte[-1] + reference_hda_cte_stmt = reference_hda_cte_stmt.join(_hda_cte, _hda_cte.id == _leaf_cte.hda_id) - # Start joins from job → input HDCA → its collection → DCE + # Apply the constant filter for the target HDCA ID + reference_hda_cte_stmt = reference_hda_cte_stmt.where(_hdca_target_cte.id == v) + + # Define the CTE object + reference_hda = reference_hda_cte_stmt.cte(f"reference_hda_{k}_{v}") + + # Start joins from job → input HDCA → its collection → DCE (left side) labeled_col = a.dataset_collection_id.label(f"{k}_{v}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) @@ -761,14 +802,12 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na stmt = stmt.join(root_collection, root_collection.id == hdca_input.collection_id) stmt = stmt.join(dce_left[0], dce_left[0].dataset_collection_id == root_collection.id) - # Join to target HDCA (v), then to its collection and its first-level DCE - hdca_target = aliased(model.HistoryDatasetCollectionAssociation) - target_collection = aliased(model.DatasetCollection) + # Join to target HDCA (v) path and its first-level DCE (right side) stmt = stmt.join(hdca_target, hdca_target.id == v) stmt = stmt.join(target_collection, target_collection.id == hdca_target.collection_id) stmt = stmt.join(dce_right[0], dce_right[0].dataset_collection_id == target_collection.id) - # Parallel walk the structure + # Parallel walk the structure, comparing element_identifiers at each level for i in range(1, depth + 1): stmt = ( stmt.join(dce_left[i], dce_left[i].dataset_collection_id == dce_left[i - 1].child_collection_id) @@ -776,42 +815,77 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na .filter(dce_left[i].element_identifier == dce_right[i].element_identifier) ) - # Compare leaf-level HDAs + # Join to the leaf-level HDA for the left side in the main query leaf_left = dce_left[-1] - leaf_right = dce_right[-1] stmt = stmt.join(hda_left, hda_left.id == leaf_left.hda_id) - stmt = stmt.join(hda_right, hda_right.id == leaf_right.hda_id) data_conditions.append( and_( a.name == k, - hda_left.dataset_id == hda_right.dataset_id, + hda_left.dataset_id.in_(select(reference_hda.c.dataset_id)), # Use the CTE's dataset_ids ) ) - - used_ids.append(labeled_col) # input-side HDCA + used_ids.append(labeled_col) return stmt def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): dce = self.sa_session.get_one(model.DatasetCollectionElement, v) - if dce.child_collection: + if dce and dce.child_collection: depth = dce.child_collection.collection_type.count(":") + 1 else: depth = 0 - a = aliased(model.JobToInputDatasetCollectionElementAssociation) - dce_left = [aliased(model.DatasetCollectionElement) for _ in range(depth + 1)] - dce_right = [aliased(model.DatasetCollectionElement) for _ in range(depth + 1)] - hda_left = aliased(model.HistoryDatasetAssociation) - hda_right = aliased(model.HistoryDatasetAssociation) - # Base joins + # Aliases for the "left" side (job to input DCE path) + a = aliased(model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{v}") + dce_left = [aliased(model.DatasetCollectionElement, name=f"dce_left_{k}_{v}_{i}") for i in range(depth + 1)] + hda_left = aliased(model.HistoryDatasetAssociation, name=f"hda_left_{k}_{v}") + + # Aliases for the "right" side (target DCE path in the main query) + dce_right = [aliased(model.DatasetCollectionElement, name=f"dce_right_{k}_{v}_{i}") for i in range(depth + 1)] + + # This CTE will get all relevant dataset_ids from the target DCE (id=v) and its nested elements. + _dce_cte_root = aliased(model.DatasetCollectionElement, name=f"_dce_cte_root_{k}_{v}") + _dce_cte_walk = [ + aliased(model.DatasetCollectionElement, name=f"_dce_cte_walk_{k}_{v}_{i}") for i in range(depth) + ] # One less, as _dce_cte_root is level 0 + _hda_cte = aliased(model.HistoryDatasetAssociation, name="_hda_cte_{k}_{v}") + + # Start the CTE query from the root DCE (id=v) + reference_dce_dataset_ids_cte_stmt = select(_hda_cte.dataset_id).select_from(_dce_cte_root) + + # Walk down the collection structure within the CTE + # The first join connects the root DCE to its potential child collection. + if depth > 0: + reference_dce_dataset_ids_cte_stmt = reference_dce_dataset_ids_cte_stmt.join( + _dce_cte_walk[0], _dce_cte_walk[0].dataset_collection_id == _dce_cte_root.child_collection_id + ) + for i in range(1, depth): # Iterate for subsequent nested levels + reference_dce_dataset_ids_cte_stmt = reference_dce_dataset_ids_cte_stmt.join( + _dce_cte_walk[i], _dce_cte_walk[i].dataset_collection_id == _dce_cte_walk[i - 1].child_collection_id + ) + + # Join to the leaf HDA for the right side within the CTE + # The leaf DCE is either _dce_cte_root (if depth is 0) or the last _dce_cte_walk alias + leaf_cte = _dce_cte_walk[-1] if depth > 0 else _dce_cte_root + reference_dce_dataset_ids_cte_stmt = reference_dce_dataset_ids_cte_stmt.join( + _hda_cte, _hda_cte.id == leaf_cte.hda_id + ) + + reference_dce_dataset_ids_cte_stmt = reference_dce_dataset_ids_cte_stmt.where(_dce_cte_root.id == v) + + # Define the CTE object + reference_dce_dataset_ids = reference_dce_dataset_ids_cte_stmt.cte(f"reference_dce_dataset_ids_{k}_{v}") + + # Start joins from job → input DCE association → first-level DCE (left side) labeled_col = a.dataset_collection_element_id.label(f"{k}_{v}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) stmt = stmt.join(dce_left[0], dce_left[0].id == a.dataset_collection_element_id) + + # Join to target DCE (v) directly (right side) stmt = stmt.join(dce_right[0], dce_right[0].id == v) - # Parallel walk the collection structure + # Parallel walk the collection structure, comparing element_identifiers at each level for i in range(1, depth + 1): stmt = ( stmt.join(dce_left[i], dce_left[i].dataset_collection_id == dce_left[i - 1].child_collection_id) @@ -819,14 +893,16 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): .filter(dce_left[i].element_identifier == dce_right[i].element_identifier) ) - # Compare dataset_ids at the leaf level + # Join to the leaf-level HDA for the left side in the main query leaf_left = dce_left[-1] - leaf_right = dce_right[-1] stmt = stmt.join(hda_left, hda_left.id == leaf_left.hda_id) - stmt = stmt.join(hda_right, hda_right.id == leaf_right.hda_id) - - data_conditions.append(and_(a.name == k, hda_left.dataset_id == hda_right.dataset_id)) + data_conditions.append( + and_( + a.name == k, + hda_left.dataset_id.in_(select(reference_dce_dataset_ids.c.dataset_id)), # Use the CTE's dataset_ids + ) + ) used_ids.append(labeled_col) return stmt From d9e2dd5b971b02b8ad1fc6780136b50fa6bd2346 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sat, 24 May 2025 10:57:58 +0200 Subject: [PATCH 08/21] Improve performance of equivalent collection matching by using computed signatures of collections. --- lib/galaxy/managers/jobs.py | 184 ++++++++++++++++++++++++------------ 1 file changed, 123 insertions(+), 61 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index ef43665d618b..821b2d725be1 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -735,7 +735,26 @@ def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v): return stmt def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_name_match=True): - # Determine depth based on collection_type of the HDCA we're matching against + # Strategy for efficiently finding equivalent HDCAs: + # 1. Determine the structural depth of the target HDCA by its collection_type. + # 2. For the target HDCA (identified by 'v'): + # a. Dynamically construct Common Table Expressions (CTEs) to traverse its (potentially nested) structure down to individual datasets. + # b. Generate a "path signature string" for each dataset element, uniquely identifying its path within the collection. + # c. Aggregate these path strings into a canonical, sorted array (the "reference full signature") using array_agg with explicit ordering. + # 3. For all candidate HDCAs: + # a. Perform a similar dynamic traversal and path signature string generation. + # b. Aggregate these into sorted "full signature" arrays for each candidate HDCA. + # 4. Finally, identify equivalent HDCAs by comparing their full signature array directly against the reference full signature array. + # + # This approach is performant because: + # - It translates the complex problem of structural collection comparison into efficient array equality checks directly within the database. + # - It leverages the power of SQL CTEs and set-based operations, allowing the database query optimizer to find an efficient execution plan. + # - Joins required to traverse collection structures are built dynamically based on the actual depth, avoiding unnecessary complexity. + # - Signatures are computed and compared entirely on the database side, minimizing data transfer to the application. + # + # Note: CTEs are uniquely named using 'k' and 'v' to allow this logic to be embedded + # within larger queries or loops processing multiple target HDCAs. Aliases are used + # extensively to manage dynamic joins based on collection depth. collection_type = self.sa_session.scalar( select(model.DatasetCollection.collection_type) .select_from(model.HistoryDatasetCollectionAssociation) @@ -744,88 +763,131 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na ) depth = collection_type.count(":") if collection_type else 0 - # Aliases for the "left" side (input HDCA path) a = aliased(model.JobToInputDatasetCollectionAssociation, name=f"job_to_input_dataset_collection_1_{k}_{v}") hdca_input = aliased( model.HistoryDatasetCollectionAssociation, name=f"history_dataset_collection_association_1_{k}_{v}" ) - root_collection = aliased(model.DatasetCollection, name=f"dataset_collection_1_{k}_{v}") - dce_left = [ - aliased(model.DatasetCollectionElement, name=f"dataset_collection_element_left_{k}_{v}_{i}") - for i in range(depth + 1) - ] - hda_left = aliased(model.HistoryDatasetAssociation, name=f"history_dataset_association_1_{k}_{v}") - # Aliases for the "right" side (target HDCA path) in the main query - hdca_target = aliased( - model.HistoryDatasetCollectionAssociation, name=f"history_dataset_collection_association_2_{k}_{v}" - ) - target_collection = aliased(model.DatasetCollection, name=f"dataset_collection_2_{k}_{v}") - dce_right = [ - aliased(model.DatasetCollectionElement, name=f"dataset_collection_element_right_{k}_{v}_{i}") - for i in range(depth + 1) + _hdca_target_cte_ref = aliased(model.HistoryDatasetCollectionAssociation, name="_hdca_target_cte_ref") + _target_collection_cte_ref = aliased(model.DatasetCollection, name="_target_collection_cte_ref") + _dce_cte_ref_list = [ + aliased(model.DatasetCollectionElement, name=f"_dce_cte_ref_{i}") for i in range(depth + 1) ] - - # This CTE will get all relevant dataset_ids from the target HDCA (id=v) and its nested elements. - _hdca_target_cte = aliased(model.HistoryDatasetCollectionAssociation, name=f"_hdca_target_cte_{k}_{v}") - _target_collection_cte = aliased(model.DatasetCollection, name=f"_target_collection_cte_{k}_{v}") - _dce_cte = [aliased(model.DatasetCollectionElement, name=f"_dce_cte_{i}") for i in range(depth + 1)] - _hda_cte = aliased(model.HistoryDatasetAssociation, name=f"_hda_cte_{k}_{v}") - - reference_hda_cte_stmt = ( - select(_hda_cte.dataset_id) - .select_from(_hdca_target_cte) - .join(_target_collection_cte, _target_collection_cte.id == _hdca_target_cte.collection_id) - .join(_dce_cte[0], _dce_cte[0].dataset_collection_id == _target_collection_cte.id) + _hda_cte_ref = aliased(model.HistoryDatasetAssociation, name="_hda_cte_ref") + + # CTE 1: signature_elements_cte + signature_elements_cte = ( + select( + func.concat_ws( + ";", + *[_dce_cte_ref_list[i].element_identifier for i in range(depth + 1)], + _hda_cte_ref.dataset_id.cast(sqlalchemy.Text), + ).label("path_signature_string") + ) + .select_from(_hdca_target_cte_ref) + .join(_target_collection_cte_ref, _target_collection_cte_ref.id == _hdca_target_cte_ref.collection_id) + .join(_dce_cte_ref_list[0], _dce_cte_ref_list[0].dataset_collection_id == _target_collection_cte_ref.id) ) for i in range(1, depth + 1): - reference_hda_cte_stmt = reference_hda_cte_stmt.join( - _dce_cte[i], _dce_cte[i].dataset_collection_id == _dce_cte[i - 1].child_collection_id + signature_elements_cte = signature_elements_cte.join( + _dce_cte_ref_list[i], + _dce_cte_ref_list[i].dataset_collection_id == _dce_cte_ref_list[i - 1].child_collection_id, ) - # Join to the leaf HDA for the right side within the CTE - _leaf_cte = _dce_cte[-1] - reference_hda_cte_stmt = reference_hda_cte_stmt.join(_hda_cte, _hda_cte.id == _leaf_cte.hda_id) + _leaf_cte_ref = _dce_cte_ref_list[-1] + signature_elements_cte = signature_elements_cte.join(_hda_cte_ref, _hda_cte_ref.id == _leaf_cte_ref.hda_id) + signature_elements_cte = signature_elements_cte.where(_hdca_target_cte_ref.id == v).cte( + f"signature_elements_{k}_{v}" + ) - # Apply the constant filter for the target HDCA ID - reference_hda_cte_stmt = reference_hda_cte_stmt.where(_hdca_target_cte.id == v) + # CTE 2: reference_full_signature_cte + reference_full_signature_cte = ( + select( + func.array_agg( + sqlalchemy.text( + f"{signature_elements_cte.c.path_signature_string.name} ORDER BY {signature_elements_cte.c.path_signature_string.name}" + ) + ).label("signature_array") + ) + .select_from(signature_elements_cte) + .cte(f"reference_full_signature_{k}_{v}") + ) - # Define the CTE object - reference_hda = reference_hda_cte_stmt.cte(f"reference_hda_{k}_{v}") + candidate_hdca = aliased(model.HistoryDatasetCollectionAssociation, name="candidate_hdca") + candidate_root_collection = aliased(model.DatasetCollection, name="candidate_root_collection") + candidate_dce_list = [ + aliased(model.DatasetCollectionElement, name=f"candidate_dce_{i}") for i in range(depth + 1) + ] + candidate_hda = aliased(model.HistoryDatasetAssociation, name="candidate_hda") + + # CTE 3: candidate_signature_elements_cte + candidate_signature_elements_cte = ( + select( + candidate_hdca.id.label("candidate_hdca_id"), + func.concat_ws( + ";", + *[candidate_dce_list[i].element_identifier for i in range(depth + 1)], + candidate_hda.dataset_id.cast(sqlalchemy.Text), + ).label("path_signature_string"), + ) + .select_from(candidate_hdca) + .join(candidate_root_collection, candidate_root_collection.id == candidate_hdca.collection_id) + .join(candidate_dce_list[0], candidate_dce_list[0].dataset_collection_id == candidate_root_collection.id) + ) - # Start joins from job → input HDCA → its collection → DCE (left side) - labeled_col = a.dataset_collection_id.label(f"{k}_{v}") - stmt = stmt.add_columns(labeled_col) - stmt = stmt.join(a, a.job_id == model.Job.id) - stmt = stmt.join(hdca_input, hdca_input.id == a.dataset_collection_id) - stmt = stmt.join(root_collection, root_collection.id == hdca_input.collection_id) - stmt = stmt.join(dce_left[0], dce_left[0].dataset_collection_id == root_collection.id) + for i in range(1, depth + 1): + candidate_signature_elements_cte = candidate_signature_elements_cte.join( + candidate_dce_list[i], + candidate_dce_list[i].dataset_collection_id == candidate_dce_list[i - 1].child_collection_id, + ) - # Join to target HDCA (v) path and its first-level DCE (right side) - stmt = stmt.join(hdca_target, hdca_target.id == v) - stmt = stmt.join(target_collection, target_collection.id == hdca_target.collection_id) - stmt = stmt.join(dce_right[0], dce_right[0].dataset_collection_id == target_collection.id) + _leaf_candidate_dce = candidate_dce_list[-1] + candidate_signature_elements_cte = candidate_signature_elements_cte.join( + candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id + ).cte(f"candidate_signature_elements_{k}_{v}") + + # CTE 4: candidate_full_signatures_cte + # Use sqlalchemy.text() to explicitly include ORDER BY inside array_agg + candidate_full_signatures_cte = ( + select( + candidate_signature_elements_cte.c.candidate_hdca_id, + func.array_agg( + sqlalchemy.text( + f"{candidate_signature_elements_cte.c.path_signature_string.name} ORDER BY {candidate_signature_elements_cte.c.path_signature_string.name}" + ) + ).label("full_signature_array"), + ) + .select_from(candidate_signature_elements_cte) + .group_by(candidate_signature_elements_cte.c.candidate_hdca_id) + .cte(f"candidate_full_signatures_{k}_{v}") + ) - # Parallel walk the structure, comparing element_identifiers at each level - for i in range(1, depth + 1): - stmt = ( - stmt.join(dce_left[i], dce_left[i].dataset_collection_id == dce_left[i - 1].child_collection_id) - .join(dce_right[i], dce_right[i].dataset_collection_id == dce_right[i - 1].child_collection_id) - .filter(dce_left[i].element_identifier == dce_right[i].element_identifier) + # CTE 5: equivalent_hdca_ids_cte + equivalent_hdca_ids_cte = ( + select(candidate_full_signatures_cte.c.candidate_hdca_id.label("equivalent_id")) + .where( + candidate_full_signatures_cte.c.full_signature_array + == select(reference_full_signature_cte.c.signature_array).scalar_subquery() ) + .cte(f"equivalent_hdca_ids_{k}_{v}") + ) - # Join to the leaf-level HDA for the left side in the main query - leaf_left = dce_left[-1] - stmt = stmt.join(hda_left, hda_left.id == leaf_left.hda_id) + # Main query `stmt` construction + labeled_col = a.dataset_collection_id.label(f"{k}_{v}") + stmt = stmt.add_columns(labeled_col) + stmt = stmt.join(a, a.job_id == model.Job.id) - data_conditions.append( + stmt = stmt.join( + hdca_input, and_( - a.name == k, - hda_left.dataset_id.in_(select(reference_hda.c.dataset_id)), # Use the CTE's dataset_ids - ) + hdca_input.id == a.dataset_collection_id, + hdca_input.id.in_(select(equivalent_hdca_ids_cte.c.equivalent_id)), + ), ) + used_ids.append(labeled_col) + data_conditions.append(a.name == k) return stmt def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): From 6f2d620a6fd119727aa978b30f66c505b9074319 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 26 May 2025 18:16:41 +0200 Subject: [PATCH 09/21] Implement candidate signature logic for DCEs as well --- lib/galaxy/managers/jobs.py | 282 ++++++++++++++++++++++++++---------- 1 file changed, 209 insertions(+), 73 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 821b2d725be1..ea330a9a928c 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -776,7 +776,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na _hda_cte_ref = aliased(model.HistoryDatasetAssociation, name="_hda_cte_ref") # CTE 1: signature_elements_cte - signature_elements_cte = ( + signature_elements_select = ( select( func.concat_ws( ";", @@ -790,16 +790,17 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na ) for i in range(1, depth + 1): - signature_elements_cte = signature_elements_cte.join( + signature_elements_select = signature_elements_select.join( _dce_cte_ref_list[i], _dce_cte_ref_list[i].dataset_collection_id == _dce_cte_ref_list[i - 1].child_collection_id, ) _leaf_cte_ref = _dce_cte_ref_list[-1] - signature_elements_cte = signature_elements_cte.join(_hda_cte_ref, _hda_cte_ref.id == _leaf_cte_ref.hda_id) - signature_elements_cte = signature_elements_cte.where(_hdca_target_cte_ref.id == v).cte( - f"signature_elements_{k}_{v}" + signature_elements_select = signature_elements_select.join( + _hda_cte_ref, _hda_cte_ref.id == _leaf_cte_ref.hda_id ) + signature_elements_select = signature_elements_select.where(_hdca_target_cte_ref.id == v) + signature_elements_cte = signature_elements_select.cte(f"signature_elements_{k}_{v}") # CTE 2: reference_full_signature_cte reference_full_signature_cte = ( @@ -822,7 +823,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na candidate_hda = aliased(model.HistoryDatasetAssociation, name="candidate_hda") # CTE 3: candidate_signature_elements_cte - candidate_signature_elements_cte = ( + candidate_signature_elements_select = ( select( candidate_hdca.id.label("candidate_hdca_id"), func.concat_ws( @@ -837,15 +838,18 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na ) for i in range(1, depth + 1): - candidate_signature_elements_cte = candidate_signature_elements_cte.join( + candidate_signature_elements_select = candidate_signature_elements_select.join( candidate_dce_list[i], candidate_dce_list[i].dataset_collection_id == candidate_dce_list[i - 1].child_collection_id, ) _leaf_candidate_dce = candidate_dce_list[-1] - candidate_signature_elements_cte = candidate_signature_elements_cte.join( + candidate_signature_elements_select = candidate_signature_elements_select.join( candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id - ).cte(f"candidate_signature_elements_{k}_{v}") + ) + candidate_signature_elements_cte = candidate_signature_elements_select.cte( + f"candidate_signature_elements_{k}_{v}" + ) # CTE 4: candidate_full_signatures_cte # Use sqlalchemy.text() to explicitly include ORDER BY inside array_agg @@ -891,82 +895,214 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na return stmt def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): - dce = self.sa_session.get_one(model.DatasetCollectionElement, v) - if dce and dce.child_collection: - depth = dce.child_collection.collection_type.count(":") + 1 - else: - depth = 0 - - # Aliases for the "left" side (job to input DCE path) - a = aliased(model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{v}") - dce_left = [aliased(model.DatasetCollectionElement, name=f"dce_left_{k}_{v}_{i}") for i in range(depth + 1)] - hda_left = aliased(model.HistoryDatasetAssociation, name=f"hda_left_{k}_{v}") - - # Aliases for the "right" side (target DCE path in the main query) - dce_right = [aliased(model.DatasetCollectionElement, name=f"dce_right_{k}_{v}_{i}") for i in range(depth + 1)] - - # This CTE will get all relevant dataset_ids from the target DCE (id=v) and its nested elements. - _dce_cte_root = aliased(model.DatasetCollectionElement, name=f"_dce_cte_root_{k}_{v}") - _dce_cte_walk = [ - aliased(model.DatasetCollectionElement, name=f"_dce_cte_walk_{k}_{v}_{i}") for i in range(depth) - ] # One less, as _dce_cte_root is level 0 - _hda_cte = aliased(model.HistoryDatasetAssociation, name="_hda_cte_{k}_{v}") - - # Start the CTE query from the root DCE (id=v) - reference_dce_dataset_ids_cte_stmt = select(_hda_cte.dataset_id).select_from(_dce_cte_root) - - # Walk down the collection structure within the CTE - # The first join connects the root DCE to its potential child collection. - if depth > 0: - reference_dce_dataset_ids_cte_stmt = reference_dce_dataset_ids_cte_stmt.join( - _dce_cte_walk[0], _dce_cte_walk[0].dataset_collection_id == _dce_cte_root.child_collection_id + dce_root_target = self.sa_session.get_one(model.DatasetCollectionElement, v) + + # Determine if the target DCE points to an HDA or a child collection + if dce_root_target.child_collection_id: + # This DCE represents a collection, apply the signature comparison approach + target_collection_id = dce_root_target.child_collection_id + collection_type = self.sa_session.scalar( + select(model.DatasetCollection.collection_type).where( + model.DatasetCollection.id == target_collection_id + ) ) - for i in range(1, depth): # Iterate for subsequent nested levels - reference_dce_dataset_ids_cte_stmt = reference_dce_dataset_ids_cte_stmt.join( - _dce_cte_walk[i], _dce_cte_walk[i].dataset_collection_id == _dce_cte_walk[i - 1].child_collection_id + depth = collection_type.count(":") if collection_type else 0 + + # Aliases for the target DCE's collection structure + _dce_target_root_ref = aliased(model.DatasetCollectionElement, name=f"_dce_target_root_ref_{k}_{v}") + _dce_target_child_collection_ref = aliased( + model.DatasetCollection, name=f"_dce_target_child_collection_ref_{k}_{v}" + ) + _dce_target_level_list = [ + aliased(model.DatasetCollectionElement, name=f"_dce_target_level_{k}_{v}_{i}") for i in range(depth + 1) + ] # +1 for first element of child_collection + _hda_target_ref = aliased(model.HistoryDatasetAssociation, name=f"_hda_target_ref_{k}_{v}") + + # CTE 1: reference_dce_signature_elements_cte + # Path starts with the target DCE's own element_identifier, then recursively traverses its child collection + path_components = [ + _dce_target_root_ref.element_identifier, + *[_dce_target_level_list[i].element_identifier for i in range(depth + 1)], + _hda_target_ref.dataset_id.cast(sqlalchemy.Text), + ] + + reference_dce_signature_elements_select = ( + select(func.concat_ws(";", *path_components).label("path_signature_string")) + .select_from(_dce_target_root_ref) + .join( + _dce_target_child_collection_ref, + _dce_target_child_collection_ref.id == _dce_target_root_ref.child_collection_id, ) + .join( + _dce_target_level_list[0], + _dce_target_level_list[0].dataset_collection_id == _dce_target_child_collection_ref.id, + ) + ) - # Join to the leaf HDA for the right side within the CTE - # The leaf DCE is either _dce_cte_root (if depth is 0) or the last _dce_cte_walk alias - leaf_cte = _dce_cte_walk[-1] if depth > 0 else _dce_cte_root - reference_dce_dataset_ids_cte_stmt = reference_dce_dataset_ids_cte_stmt.join( - _hda_cte, _hda_cte.id == leaf_cte.hda_id - ) + for i in range(1, depth + 1): + reference_dce_signature_elements_select = reference_dce_signature_elements_select.join( + _dce_target_level_list[i], + _dce_target_level_list[i].dataset_collection_id + == _dce_target_level_list[i - 1].child_collection_id, + ) - reference_dce_dataset_ids_cte_stmt = reference_dce_dataset_ids_cte_stmt.where(_dce_cte_root.id == v) + _leaf_target_dce_ref = _dce_target_level_list[-1] + reference_dce_signature_elements_select = reference_dce_signature_elements_select.join( + _hda_target_ref, _hda_target_ref.id == _leaf_target_dce_ref.hda_id + ).where(_dce_target_root_ref.id == v) + reference_dce_signature_elements_cte = reference_dce_signature_elements_select.cte( + f"ref_dce_sig_els_{k}_{v}" + ) - # Define the CTE object - reference_dce_dataset_ids = reference_dce_dataset_ids_cte_stmt.cte(f"reference_dce_dataset_ids_{k}_{v}") + # CTE 2: reference_full_signature_cte + reference_full_signature_cte = ( + select( + func.array_agg( + sqlalchemy.text( + f"{reference_dce_signature_elements_cte.c.path_signature_string} ORDER BY {reference_dce_signature_elements_cte.c.path_signature_string}" + ) + ).label("signature_array") + ) + .select_from(reference_dce_signature_elements_cte) + .cte(f"ref_dce_full_sig_{k}_{v}") + ) - # Start joins from job → input DCE association → first-level DCE (left side) - labeled_col = a.dataset_collection_element_id.label(f"{k}_{v}") - stmt = stmt.add_columns(labeled_col) - stmt = stmt.join(a, a.job_id == model.Job.id) - stmt = stmt.join(dce_left[0], dce_left[0].id == a.dataset_collection_element_id) + # Candidate DCEs for comparison + candidate_dce_root = aliased(model.DatasetCollectionElement, name=f"candidate_dce_root_{k}_{v}") + candidate_dce_child_collection = aliased( + model.DatasetCollection, name=f"candidate_dce_child_collection_{k}_{v}" + ) + candidate_dce_level_list = [ + aliased(model.DatasetCollectionElement, name=f"candidate_dce_level_{k}_{v}_{i}") + for i in range(depth + 1) + ] + candidate_hda = aliased(model.HistoryDatasetAssociation, name=f"candidate_hda_{k}_{v}") + + # CTE 3: candidate_dce_signature_elements_cte + candidate_path_components = [ + candidate_dce_root.element_identifier, + *[candidate_dce_level_list[i].element_identifier for i in range(depth + 1)], + candidate_hda.dataset_id.cast(sqlalchemy.Text), + ] + + candidate_dce_signature_elements_select = ( + select( + candidate_dce_root.id.label("candidate_dce_id"), + func.concat_ws(";", *candidate_path_components).label("path_signature_string"), + ) + .select_from(candidate_dce_root) + # Filter for candidates that also point to a child collection of the same depth + .where(candidate_dce_root.child_collection_id.isnot(None)) + .join( + candidate_dce_child_collection, + candidate_dce_child_collection.id == candidate_dce_root.child_collection_id, + ) + .join( + candidate_dce_level_list[0], + candidate_dce_level_list[0].dataset_collection_id == candidate_dce_child_collection.id, + ) + ) + # Add dynamic joins for nested levels + for i in range(1, depth + 1): + candidate_dce_signature_elements_select = candidate_dce_signature_elements_select.join( + candidate_dce_level_list[i], + candidate_dce_level_list[i].dataset_collection_id + == candidate_dce_level_list[i - 1].child_collection_id, + ) - # Join to target DCE (v) directly (right side) - stmt = stmt.join(dce_right[0], dce_right[0].id == v) + _leaf_candidate_dce = candidate_dce_level_list[-1] + candidate_dce_signature_elements_select = candidate_dce_signature_elements_select.join( + candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id + ) + candidate_dce_signature_elements_cte = candidate_dce_signature_elements_select.cte( + f"cand_dce_sig_els_{k}_{v}" + ) - # Parallel walk the collection structure, comparing element_identifiers at each level - for i in range(1, depth + 1): - stmt = ( - stmt.join(dce_left[i], dce_left[i].dataset_collection_id == dce_left[i - 1].child_collection_id) - .join(dce_right[i], dce_right[i].dataset_collection_id == dce_right[i - 1].child_collection_id) - .filter(dce_left[i].element_identifier == dce_right[i].element_identifier) + # CTE 4: candidate_full_signatures_cte + candidate_full_signatures_cte = ( + select( + candidate_dce_signature_elements_cte.c.candidate_dce_id, + func.array_agg( + sqlalchemy.text( + f"{candidate_dce_signature_elements_cte.c.path_signature_string} ORDER BY {candidate_dce_signature_elements_cte.c.path_signature_string.name}" + ) + ).label("full_signature_array"), + ) + .select_from(candidate_dce_signature_elements_cte) + .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id) + .cte(f"cand_dce_full_sig_{k}_{v}") ) - # Join to the leaf-level HDA for the left side in the main query - leaf_left = dce_left[-1] - stmt = stmt.join(hda_left, hda_left.id == leaf_left.hda_id) + # CTE 5: equivalent_dce_ids_cte + equivalent_dce_ids_cte = ( + select(candidate_full_signatures_cte.c.candidate_dce_id.label("equivalent_id")) + .where( + candidate_full_signatures_cte.c.full_signature_array + == select(reference_full_signature_cte.c.signature_array).scalar_subquery() + ) + .cte(f"eq_dce_ids_{k}_{v}") + ) - data_conditions.append( - and_( - a.name == k, - hda_left.dataset_id.in_(select(reference_dce_dataset_ids.c.dataset_id)), # Use the CTE's dataset_ids + # Main query `stmt` construction for DCE collections + a = aliased( + model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{v}" ) - ) - used_ids.append(labeled_col) - return stmt + labeled_col = a.dataset_collection_element_id.label(f"{k}_{v}") + stmt = stmt.add_columns(labeled_col) + stmt = stmt.join(a, a.job_id == model.Job.id) + + input_dce = aliased(model.DatasetCollectionElement) + + stmt = stmt.join( + input_dce, + and_( + input_dce.id == a.dataset_collection_element_id, + input_dce.id.in_(select(equivalent_dce_ids_cte.c.equivalent_id)), + ), + ) + + data_conditions.append(a.name == k) + used_ids.append(labeled_col) + return stmt + + else: # DCE points directly to an HDA (dce_root_target.hda_id is not None and child_collection_id is None) + # For this simple case, the full signature array comparison for nested collections doesn't apply. + # We can use a direct comparison of the HDA and element_identifier. + # This logic needs to align with how this type of DCE was previously matched. + # Reverting to the original logic for single-HDA DCEs, but simplified for clarity: + + # Aliases for the "left" side (job to input DCE path) + a = aliased( + model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{v}" + ) + dce_left = aliased(model.DatasetCollectionElement, name=f"dce_left_{k}_{v}") + hda_left = aliased(model.HistoryDatasetAssociation, name=f"hda_left_{k}_{v}") + + # Aliases for the "right" side (target DCE path in the main query) + dce_right = aliased(model.DatasetCollectionElement, name=f"dce_right_{k}_{v}") + hda_right = aliased(model.HistoryDatasetAssociation, name=f"hda_right_{k}_{v}") + + # Start joins from job → input DCE association → first-level DCE (left side) + labeled_col = a.dataset_collection_element_id.label(f"{k}_{v}") + stmt = stmt.add_columns(labeled_col) + stmt = stmt.join(a, a.job_id == model.Job.id) + stmt = stmt.join(dce_left, dce_left.id == a.dataset_collection_element_id) + stmt = stmt.join(hda_left, hda_left.id == dce_left.hda_id) # Join to HDA for left side + + # Join to target DCE (v) directly (right side) + stmt = stmt.join(dce_right, dce_right.id == v) + stmt = stmt.join(hda_right, hda_right.id == dce_right.hda_id) # Join to HDA for right side + + # Compare element identifiers and dataset IDs + data_conditions.append( + and_( + a.name == k, + dce_left.element_identifier == dce_right.element_identifier, + hda_left.dataset_id == hda_right.dataset_id, # Direct dataset_id comparison + ) + ) + used_ids.append(labeled_col) + return stmt def view_show_job(trans, job: Job, full: bool) -> Dict: From e75698352a0959e7952ba54ca11bdd295a543f1f Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 28 May 2025 19:00:00 +0200 Subject: [PATCH 10/21] Join to hdca / hda history to limit candidate search space --- lib/galaxy/managers/jobs.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index ea330a9a928c..98e91a60425f 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -481,9 +481,9 @@ def replace_dataset_ids(path, key, value): elif t == "ldda": stmt = self._build_stmt_for_ldda(stmt, data_conditions, used_ids, k, v) elif t == "hdca": - stmt = self._build_stmt_for_hdca(stmt, data_conditions, used_ids, k, v) + stmt = self._build_stmt_for_hdca(stmt, data_conditions, used_ids, k, v, user.id) elif t == "dce": - stmt = self._build_stmt_for_dce(stmt, data_conditions, used_ids, k, v) + stmt = self._build_stmt_for_dce(stmt, data_conditions, used_ids, k, v, user.id) else: log.error("Unknown input data type %s", t) return None @@ -734,7 +734,7 @@ def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v): used_ids.append(labeled_col) return stmt - def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_name_match=True): + def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, require_name_match=True): # Strategy for efficiently finding equivalent HDCAs: # 1. Determine the structural depth of the target HDCA by its collection_type. # 2. For the target HDCA (identified by 'v'): @@ -816,6 +816,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na ) candidate_hdca = aliased(model.HistoryDatasetCollectionAssociation, name="candidate_hdca") + candidate_hdca_history = aliased(model.History, name="candidate_hdca_history") candidate_root_collection = aliased(model.DatasetCollection, name="candidate_root_collection") candidate_dce_list = [ aliased(model.DatasetCollectionElement, name=f"candidate_dce_{i}") for i in range(depth + 1) @@ -833,8 +834,10 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na ).label("path_signature_string"), ) .select_from(candidate_hdca) + .join(candidate_hdca_history, candidate_hdca_history.id == candidate_hdca.history_id) .join(candidate_root_collection, candidate_root_collection.id == candidate_hdca.collection_id) .join(candidate_dce_list[0], candidate_dce_list[0].dataset_collection_id == candidate_root_collection.id) + .where(or_(candidate_hdca_history.user_id == user_id, candidate_hdca_history.published == true())) ) for i in range(1, depth + 1): @@ -894,7 +897,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na data_conditions.append(a.name == k) return stmt - def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): + def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): dce_root_target = self.sa_session.get_one(model.DatasetCollectionElement, v) # Determine if the target DCE points to an HDA or a child collection @@ -1011,8 +1014,12 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): ) _leaf_candidate_dce = candidate_dce_level_list[-1] - candidate_dce_signature_elements_select = candidate_dce_signature_elements_select.join( - candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id + candidate_dce_signature_elements_select = ( + candidate_dce_signature_elements_select.join( + candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id + ) + .join(model.History, model.History.id == candidate_hda.history_id) + .where(or_(model.History.published == true(), model.History.user_id == user_id)) ) candidate_dce_signature_elements_cte = candidate_dce_signature_elements_select.cte( f"cand_dce_sig_els_{k}_{v}" From e62691a37175717a071a66a15daf8c4b65dccd53 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 30 May 2025 14:03:14 +0200 Subject: [PATCH 11/21] Add pre-filtering step that narrows DCEs down to only those that share dataset_ids with original input DCE collection. --- lib/galaxy/managers/jobs.py | 213 ++++++++++++++++++++++++++++++------ 1 file changed, 180 insertions(+), 33 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 98e91a60425f..42d2d527280e 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -916,21 +916,61 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): _dce_target_child_collection_ref = aliased( model.DatasetCollection, name=f"_dce_target_child_collection_ref_{k}_{v}" ) + # List of aliases for each potential nested level of DatasetCollectionElements _dce_target_level_list = [ aliased(model.DatasetCollectionElement, name=f"_dce_target_level_{k}_{v}_{i}") for i in range(depth + 1) - ] # +1 for first element of child_collection + ] _hda_target_ref = aliased(model.HistoryDatasetAssociation, name=f"_hda_target_ref_{k}_{v}") - # CTE 1: reference_dce_signature_elements_cte - # Path starts with the target DCE's own element_identifier, then recursively traverses its child collection + # --- CTE: reference_dce_all_dataset_ids_cte --- + # This CTE (Common Table Expression) identifies all distinct dataset IDs + # that are part of the *reference* dataset collection (the one we're searching for). + # This helps in the initial filtering of candidate collections. + reference_all_dataset_ids_select = ( + select(_hda_target_ref.dataset_id.label("ref_dataset_id_for_overlap")) + .select_from(_dce_target_root_ref) + .join( + _dce_target_child_collection_ref, + _dce_target_child_collection_ref.id == _dce_target_root_ref.child_collection_id, + ) + .join( + _dce_target_level_list[0], + _dce_target_level_list[0].dataset_collection_id == _dce_target_child_collection_ref.id, + ) + ) + # Dynamically add joins for each nested level of the collection + for i in range(1, depth + 1): + reference_all_dataset_ids_select = reference_all_dataset_ids_select.join( + _dce_target_level_list[i], + _dce_target_level_list[i].dataset_collection_id + == _dce_target_level_list[i - 1].child_collection_id, + ) + _leaf_target_dce_ref = _dce_target_level_list[-1] + reference_all_dataset_ids_select = ( + reference_all_dataset_ids_select.join( + _hda_target_ref, _hda_target_ref.id == _leaf_target_dce_ref.hda_id + ) + .where(_dce_target_root_ref.id == v) + .distinct() + ) + reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{v}") + + # --- CTE: reference_dce_signature_elements_cte --- + # This CTE generates a "path signature string" for each individual element + # within the *reference* collection. This signature combines identifiers + # from all levels of the collection and the dataset ID, providing a unique + # identifier for each dataset's position within the collection structure. path_components = [ _dce_target_root_ref.element_identifier, *[_dce_target_level_list[i].element_identifier for i in range(depth + 1)], - _hda_target_ref.dataset_id.cast(sqlalchemy.Text), + _hda_target_ref.dataset_id.cast(sqlalchemy.Text), # Ensure type for concat_ws ] reference_dce_signature_elements_select = ( - select(func.concat_ws(";", *path_components).label("path_signature_string")) + select( + func.concat_ws(";", *path_components).label("path_signature_string"), + _hda_target_ref.dataset_id.label("raw_dataset_id_for_ordering"), # Keep original type for ordering + ) .select_from(_dce_target_root_ref) .join( _dce_target_child_collection_ref, @@ -957,20 +997,33 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): f"ref_dce_sig_els_{k}_{v}" ) - # CTE 2: reference_full_signature_cte + # --- CTE: reference_full_signature_cte --- + # This CTE aggregates the path signatures and dataset IDs of the *reference* + # collection into ordered arrays. These arrays form the "full signature" + # used for direct comparison with candidate collections. reference_full_signature_cte = ( select( func.array_agg( - sqlalchemy.text( - f"{reference_dce_signature_elements_cte.c.path_signature_string} ORDER BY {reference_dce_signature_elements_cte.c.path_signature_string}" - ) - ).label("signature_array") + reference_dce_signature_elements_cte.c.path_signature_string, + order_by=reference_dce_signature_elements_cte.c.path_signature_string, + ).label("signature_array"), + func.array_agg( + reference_dce_signature_elements_cte.c.raw_dataset_id_for_ordering.cast( + sqlalchemy.Integer + ), # Cast to Integer here + order_by=reference_dce_signature_elements_cte.c.path_signature_string, # Order by full path to ensure consistency + ).label("ordered_dataset_id_array"), + func.count(reference_dce_signature_elements_cte.c.path_signature_string).label( + "element_count" + ), # Count elements based on path_signature_string ) .select_from(reference_dce_signature_elements_cte) .cte(f"ref_dce_full_sig_{k}_{v}") ) - # Candidate DCEs for comparison + # --- Aliases for Candidate Dataset Collection Structure --- + # These aliases are used to represent potential matching dataset collections + # in the database, which will be compared against the reference. candidate_dce_root = aliased(model.DatasetCollectionElement, name=f"candidate_dce_root_{k}_{v}") candidate_dce_child_collection = aliased( model.DatasetCollection, name=f"candidate_dce_child_collection_{k}_{v}" @@ -980,21 +1033,66 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): for i in range(depth + 1) ] candidate_hda = aliased(model.HistoryDatasetAssociation, name=f"candidate_hda_{k}_{v}") + candidate_history = aliased(model.History, name=f"candidate_history_{k}_{v}") + + # --- CTE: candidate_dce_pre_filter_ids_cte (Initial Candidate Filtering) --- + # This CTE performs a first pass to quickly narrow down potential candidate + # dataset collections. It checks for: + # 1. Existence of a child collection (ensuring it's a collection, not a single HDA). + # 2. User permissions (published or owned by the current user). + # 3. Overlap in *any* dataset IDs with the reference collection. + candidate_dce_pre_filter_ids_select = ( + select(candidate_dce_root.id.label("candidate_dce_id")) + .distinct() + .select_from(candidate_dce_root) + .where(candidate_dce_root.child_collection_id.isnot(None)) + .join( + candidate_dce_child_collection, + candidate_dce_child_collection.id == candidate_dce_root.child_collection_id, + ) + .join( + candidate_dce_level_list[0], + candidate_dce_level_list[0].dataset_collection_id == candidate_dce_child_collection.id, + ) + ) + for i in range(1, depth + 1): + candidate_dce_pre_filter_ids_select = candidate_dce_pre_filter_ids_select.join( + candidate_dce_level_list[i], + candidate_dce_level_list[i].dataset_collection_id + == candidate_dce_level_list[i - 1].child_collection_id, + ) + _leaf_candidate_dce_pre = candidate_dce_level_list[-1] + candidate_dce_pre_filter_ids_select = ( + candidate_dce_pre_filter_ids_select.join( + candidate_hda, candidate_hda.id == _leaf_candidate_dce_pre.hda_id + ) + .join(candidate_history, candidate_history.id == candidate_hda.history_id) + .where(or_(candidate_history.published == true(), candidate_history.user_id == user_id)) + .where(candidate_hda.dataset_id.in_(select(reference_all_dataset_ids_cte.c.ref_dataset_id_for_overlap))) + ) + candidate_dce_pre_filter_ids_cte = candidate_dce_pre_filter_ids_select.cte( + f"cand_dce_pre_filter_ids_{k}_{v}" + ) - # CTE 3: candidate_dce_signature_elements_cte - candidate_path_components = [ + # --- CTE: candidate_dce_signature_elements_cte --- + # This CTE calculates the path signature string and raw dataset ID for each + # element within the *pre-filtered candidate* collections. This is similar + # to the reference signature elements CTE but for the candidates. + candidate_path_components_fixed = [ candidate_dce_root.element_identifier, *[candidate_dce_level_list[i].element_identifier for i in range(depth + 1)], - candidate_hda.dataset_id.cast(sqlalchemy.Text), + candidate_hda.dataset_id.cast(sqlalchemy.Text), # Ensure type for concat_ws ] candidate_dce_signature_elements_select = ( select( candidate_dce_root.id.label("candidate_dce_id"), - func.concat_ws(";", *candidate_path_components).label("path_signature_string"), + func.concat_ws(";", *candidate_path_components_fixed).label("path_signature_string"), + candidate_hda.dataset_id.label("dataset_id_for_ordered_array"), # This is now Integer ) .select_from(candidate_dce_root) - # Filter for candidates that also point to a child collection of the same depth + # Apply the initial filter here! + .where(candidate_dce_root.id.in_(select(candidate_dce_pre_filter_ids_cte.c.candidate_dce_id))) .where(candidate_dce_root.child_collection_id.isnot(None)) .join( candidate_dce_child_collection, @@ -1018,15 +1116,61 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): candidate_dce_signature_elements_select.join( candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id ) - .join(model.History, model.History.id == candidate_hda.history_id) - .where(or_(model.History.published == true(), model.History.user_id == user_id)) + .join(candidate_history, candidate_history.id == candidate_hda.history_id) + .where(or_(candidate_history.published == true(), candidate_history.user_id == user_id)) ) candidate_dce_signature_elements_cte = candidate_dce_signature_elements_select.cte( f"cand_dce_sig_els_{k}_{v}" ) - # CTE 4: candidate_full_signatures_cte - candidate_full_signatures_cte = ( + # --- CTE: candidate_pre_signatures_cte (Candidate Aggregation for Comparison) --- + # This CTE aggregates the dataset IDs from the candidate collections into + # ordered arrays, similar to `reference_full_signature_cte`. It also + # counts the elements to ensure size consistency. + candidate_pre_signatures_cte = ( + select( + candidate_dce_signature_elements_cte.c.candidate_dce_id, + # Corrected array_agg syntax: pass column directly, use order_by keyword + func.array_agg( + candidate_dce_signature_elements_cte.c.dataset_id_for_ordered_array.cast( + sqlalchemy.Integer + ), # Cast explicitly + order_by=candidate_dce_signature_elements_cte.c.path_signature_string, # Order by the full path + ).label("candidate_ordered_dataset_ids_array"), + func.count(candidate_dce_signature_elements_cte.c.candidate_dce_id).label( + "candidate_element_count" + ), + ) + .select_from(candidate_dce_signature_elements_cte) + .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id) + .cte(f"cand_dce_pre_sig_{k}_{v}") + ) + + # --- CTE: filtered_cand_dce_by_dataset_ids_cte (Filtering by Element Count and Dataset ID Array) --- + # This crucial CTE filters the candidate collections further by comparing: + # 1. Their total element count with the reference collection's element count. + # 2. Their ordered array of dataset IDs with the reference's ordered array. + # This step ensures that candidate collections have the same number of elements + # and contain the exact same datasets, in the same logical order (based on path). + filtered_cand_dce_by_dataset_ids_cte = ( + select(candidate_pre_signatures_cte.c.candidate_dce_id) + .select_from(candidate_pre_signatures_cte, reference_full_signature_cte) + .where( + and_( + candidate_pre_signatures_cte.c.candidate_element_count + == reference_full_signature_cte.c.element_count, + candidate_pre_signatures_cte.c.candidate_ordered_dataset_ids_array + == reference_full_signature_cte.c.ordered_dataset_id_array, + ) + ) + .cte(f"filtered_cand_dce_{k}_{v}") + ) + + # --- CTE: final_candidate_signatures_cte (Final Full Signature Calculation for Matched Candidates) --- + # For the candidates that passed the previous filtering, this CTE calculates + # their full path signature array. This signature represents the complete + # structural and content identity of the collection. + final_candidate_signatures_cte = ( select( candidate_dce_signature_elements_cte.c.candidate_dce_id, func.array_agg( @@ -1036,21 +1180,18 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): ).label("full_signature_array"), ) .select_from(candidate_dce_signature_elements_cte) - .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id) - .cte(f"cand_dce_full_sig_{k}_{v}") - ) - - # CTE 5: equivalent_dce_ids_cte - equivalent_dce_ids_cte = ( - select(candidate_full_signatures_cte.c.candidate_dce_id.label("equivalent_id")) .where( - candidate_full_signatures_cte.c.full_signature_array - == select(reference_full_signature_cte.c.signature_array).scalar_subquery() + candidate_dce_signature_elements_cte.c.candidate_dce_id.in_( + select(filtered_cand_dce_by_dataset_ids_cte.c.candidate_dce_id) + ) ) - .cte(f"eq_dce_ids_{k}_{v}") + .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id) + .cte(f"final_cand_dce_full_sig_{k}_{v}") ) - # Main query `stmt` construction for DCE collections + # --- Main Query Construction for Dataset Collection Elements --- + # This section joins the main `stmt` (representing jobs) with the CTEs + # to filter jobs whose input DCE matches the reference DCE's full signature. a = aliased( model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{v}" ) @@ -1064,7 +1205,14 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): input_dce, and_( input_dce.id == a.dataset_collection_element_id, - input_dce.id.in_(select(equivalent_dce_ids_cte.c.equivalent_id)), + # The final filter: ensure the input DCE's ID is among those candidates + # whose full signature array *exactly matches* the reference's signature array. + input_dce.id.in_( + select(final_candidate_signatures_cte.c.candidate_dce_id).where( + final_candidate_signatures_cte.c.full_signature_array + == select(reference_full_signature_cte.c.signature_array).scalar_subquery() + ) + ), ), ) @@ -1076,7 +1224,6 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): # For this simple case, the full signature array comparison for nested collections doesn't apply. # We can use a direct comparison of the HDA and element_identifier. # This logic needs to align with how this type of DCE was previously matched. - # Reverting to the original logic for single-HDA DCEs, but simplified for clarity: # Aliases for the "left" side (job to input DCE path) a = aliased( From d4982f48e810a878c110b10d30f1a11bcae5c8b6 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 30 May 2025 15:28:36 +0200 Subject: [PATCH 12/21] Add pre-filtering step for search by hdca --- lib/galaxy/managers/jobs.py | 79 ++++++++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 2 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 42d2d527280e..1a8434b55a68 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -775,7 +775,36 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r ] _hda_cte_ref = aliased(model.HistoryDatasetAssociation, name="_hda_cte_ref") - # CTE 1: signature_elements_cte + # --- NEW CTE: reference_hdca_all_dataset_ids_cte --- + # This CTE identifies all distinct dataset IDs that are part of the *reference* + # History Dataset Collection Association (HDCA). This is used for an initial, + # fast pre-filtering of candidate HDCAs. + reference_all_dataset_ids_select = ( + select(_hda_cte_ref.dataset_id.label("ref_dataset_id_for_overlap")) + .select_from(_hdca_target_cte_ref) + .join(_target_collection_cte_ref, _target_collection_cte_ref.id == _hdca_target_cte_ref.collection_id) + .join(_dce_cte_ref_list[0], _dce_cte_ref_list[0].dataset_collection_id == _target_collection_cte_ref.id) + ) + + for i in range(1, depth + 1): + reference_all_dataset_ids_select = reference_all_dataset_ids_select.join( + _dce_cte_ref_list[i], + _dce_cte_ref_list[i].dataset_collection_id == _dce_cte_ref_list[i - 1].child_collection_id, + ) + + _leaf_cte_ref = _dce_cte_ref_list[-1] + reference_all_dataset_ids_select = ( + reference_all_dataset_ids_select.join(_hda_cte_ref, _hda_cte_ref.id == _leaf_cte_ref.hda_id) + .where(_hdca_target_cte_ref.id == v) + .distinct() + ) + reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{v}") + # --- END NEW CTE --- + + # CTE 1: signature_elements_cte (for the reference HDCA) + # This CTE generates a unique "path signature string" for each dataset element + # within the reference HDCA. This string identifies the element's position + # and content within the nested collection structure. signature_elements_select = ( select( func.concat_ws( @@ -803,6 +832,8 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r signature_elements_cte = signature_elements_select.cte(f"signature_elements_{k}_{v}") # CTE 2: reference_full_signature_cte + # This CTE aggregates the path signature strings of the reference HDCA into a + # canonical, sorted array. This array represents the complete "signature" of the collection. reference_full_signature_cte = ( select( func.array_agg( @@ -823,7 +854,42 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r ] candidate_hda = aliased(model.HistoryDatasetAssociation, name="candidate_hda") + # --- NEW CTE: candidate_hdca_pre_filter_ids_cte (First Pass Candidate Filtering) --- + # This CTE performs a quick initial filter on candidate HDCAs. + # It checks for: + # 1. User permissions (published or owned by the current user). + # 2. Whether the candidate HDCA contains any dataset IDs that are also present + # in the reference HDCA (an overlap check). This is a broad filter to + # reduce the number of candidates before more expensive signature generation. + candidate_hdca_pre_filter_ids_select = ( + select(candidate_hdca.id.label("candidate_hdca_id")) + .distinct() + .select_from(candidate_hdca) + .join(candidate_hdca_history, candidate_hdca_history.id == candidate_hdca.history_id) + .join(candidate_root_collection, candidate_root_collection.id == candidate_hdca.collection_id) + .join(candidate_dce_list[0], candidate_dce_list[0].dataset_collection_id == candidate_root_collection.id) + ) + + for i in range(1, depth + 1): + candidate_hdca_pre_filter_ids_select = candidate_hdca_pre_filter_ids_select.join( + candidate_dce_list[i], + candidate_dce_list[i].dataset_collection_id == candidate_dce_list[i - 1].child_collection_id, + ) + + _leaf_candidate_dce_pre = candidate_dce_list[-1] + candidate_hdca_pre_filter_ids_select = ( + candidate_hdca_pre_filter_ids_select.join(candidate_hda, candidate_hda.id == _leaf_candidate_dce_pre.hda_id) + .where(or_(candidate_hdca_history.user_id == user_id, candidate_hdca_history.published == true())) + .where(candidate_hda.dataset_id.in_(select(reference_all_dataset_ids_cte.c.ref_dataset_id_for_overlap))) + ) + candidate_hdca_pre_filter_ids_cte = candidate_hdca_pre_filter_ids_select.cte( + f"cand_hdca_pre_filter_ids_{k}_{v}" + ) + # --- END NEW CTE --- + # CTE 3: candidate_signature_elements_cte + # This CTE generates the path signature string for each element of the + # *pre-filtered candidate* HDCAs. candidate_signature_elements_select = ( select( candidate_hdca.id.label("candidate_hdca_id"), @@ -834,6 +900,8 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r ).label("path_signature_string"), ) .select_from(candidate_hdca) + # Apply the pre-filter here to limit the candidates for full signature generation + .where(candidate_hdca.id.in_(select(candidate_hdca_pre_filter_ids_cte.c.candidate_hdca_id))) .join(candidate_hdca_history, candidate_hdca_history.id == candidate_hdca.history_id) .join(candidate_root_collection, candidate_root_collection.id == candidate_hdca.collection_id) .join(candidate_dce_list[0], candidate_dce_list[0].dataset_collection_id == candidate_root_collection.id) @@ -855,7 +923,8 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r ) # CTE 4: candidate_full_signatures_cte - # Use sqlalchemy.text() to explicitly include ORDER BY inside array_agg + # This CTE aggregates the path signature strings for the candidate HDCAs into + # ordered arrays, similar to the reference's full signature. candidate_full_signatures_cte = ( select( candidate_signature_elements_cte.c.candidate_hdca_id, @@ -871,6 +940,8 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r ) # CTE 5: equivalent_hdca_ids_cte + # This final CTE identifies the HDCAs that are truly "equivalent" by + # comparing their full signature array to the reference HDCA's full signature array. equivalent_hdca_ids_cte = ( select(candidate_full_signatures_cte.c.candidate_hdca_id.label("equivalent_id")) .where( @@ -881,6 +952,8 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r ) # Main query `stmt` construction + # This section joins the base job statement with the associations and then filters + # by the HDCAs identified as equivalent in the CTEs. labeled_col = a.dataset_collection_id.label(f"{k}_{v}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) @@ -889,6 +962,8 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r hdca_input, and_( hdca_input.id == a.dataset_collection_id, + # Filter the main query to only include HDCAs found in the + # 'equivalent_hdca_ids_cte'. hdca_input.id.in_(select(equivalent_hdca_ids_cte.c.equivalent_id)), ), ) From 27cbd678bb6ada7ca49a70f3b111932005e68a10 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sun, 1 Jun 2025 15:55:54 +0200 Subject: [PATCH 13/21] Force job_id materialization --- lib/galaxy/managers/jobs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 1a8434b55a68..d0d7f7af4a71 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -560,11 +560,11 @@ def _filter_jobs( self, stmt, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump ): """Build subquery that selects a job with correct job parameters.""" - subquery_alias = stmt.subquery("job_ids_subquery") - outer_select_columns = [subquery_alias.c[col.name] for col in stmt.selected_columns] - stmt = select(*outer_select_columns).select_from(subquery_alias) + job_ids_materialized_cte = stmt.cte("job_ids_cte").prefix_with("MATERIALIZED") + outer_select_columns = [job_ids_materialized_cte.c[col.name] for col in stmt.selected_columns] + stmt = select(*outer_select_columns).select_from(job_ids_materialized_cte) stmt = ( - stmt.join(model.Job, model.Job.id == subquery_alias.c.job_id) + stmt.join(model.Job, model.Job.id == job_ids_materialized_cte.c.job_id) .join(model.History, model.Job.history_id == model.History.id) .where( and_( From 575e7f734c57d187bcdef11af68784dda2dafe43 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sun, 1 Jun 2025 21:13:57 +0200 Subject: [PATCH 14/21] Replace history_dataset_association_history subquery with outer join This is way more efficient. --- lib/galaxy/managers/jobs.py | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index d0d7f7af4a71..578901d3fea0 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -674,12 +674,15 @@ def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, stmt = stmt.add_columns(labeled_col) used_ids.append(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) - hda_stmt = select(model.HistoryDatasetAssociation.id).where( - model.HistoryDatasetAssociation.id == e.history_dataset_association_id - ) # b is the HDA used for the job stmt = stmt.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id) name_condition = [] + hda_history_join_conditions = [ + e.history_dataset_association_id == b.id, + e.extension == c.extension, + e._metadata == c._metadata, + e.version == a.dataset_version, + ] if identifier: stmt = stmt.join(d) data_conditions.append( @@ -689,19 +692,10 @@ def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, ) ) elif require_name_match: - hda_stmt = hda_stmt.where(e.name == c.name) + hda_history_join_conditions.append(e.name == c.name) name_condition.append(b.name == c.name) - hda_stmt = ( - hda_stmt.where( - e.extension == c.extension, - ) - .where( - a.dataset_version == e.version, - ) - .where( - e._metadata == c._metadata, - ) - ) + + stmt = stmt.outerjoin(e, and_(*hda_history_join_conditions)) data_conditions.append( and_( a.name == k, @@ -718,7 +712,7 @@ def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, b.metadata == c.metadata, *name_condition, ), - b.id.in_(hda_stmt), + e.history_dataset_association_id.isnot(None), ), or_(b.deleted == false(), c.deleted == false()), ) From 8e8d8999a349e0968e74dd6fdd99bcf2e788b1b5 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sun, 1 Jun 2025 23:08:55 +0200 Subject: [PATCH 15/21] Materialize unordered results, then order by job id Otherwise the query planner decides on a merge join on job_ids_cte.job_id = job.id. --- lib/galaxy/managers/jobs.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 578901d3fea0..cb6fd2a0591e 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -661,8 +661,14 @@ def _exclude_jobs_with_deleted_outputs(self, stmt): ~deleted_collection_exists, # NOT EXISTS deleted collection ~deleted_dataset_exists, # NOT EXISTS deleted dataset ) - ).order_by(job_id_from_subquery.desc()) - return outer_stmt + ) + unordered_results_cte = outer_stmt.cte("unordered_results").prefix_with("MATERIALIZED") + final_ordered_stmt = ( + select(*unordered_results_cte.c) + .select_from(unordered_results_cte) + .order_by(unordered_results_cte.c.job_id.desc()) + ) + return final_ordered_stmt def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, require_name_match=True): a = aliased(model.JobToInputDatasetAssociation) From 9d3268e7215f5baa32dc2f938fc3e5d86d2f6c7e Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 2 Jun 2025 08:40:38 +0200 Subject: [PATCH 16/21] Ensure query columns are unique --- lib/galaxy/managers/jobs.py | 115 ++++++++++++++++++------------ lib/galaxy_test/api/test_tools.py | 17 +++++ 2 files changed, 86 insertions(+), 46 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index cb6fd2a0591e..89980d744fee 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -466,9 +466,9 @@ def replace_dataset_ids(path, key, value): data_types = [] used_ids: List = [] for k, input_list in input_data.items(): - # k will be matched against the JobParameter.name column. This can be prefixed depending on whethter + # k will be matched against the JobParameter.name column. This can be prefixed depending on whether # the input is in a repeat, or not (section and conditional) - for type_values in input_list: + for value_index, type_values in enumerate(input_list): t = type_values["src"] v = type_values["id"] requested_ids.append(v) @@ -476,14 +476,25 @@ def replace_dataset_ids(path, key, value): identifier = type_values["identifier"] if t == "hda": stmt = self._build_stmt_for_hda( - stmt, data_conditions, used_ids, k, v, identifier, require_name_match=require_name_match + stmt, + data_conditions, + used_ids, + k, + v, + identifier, + require_name_match=require_name_match, + value_index=value_index, ) elif t == "ldda": - stmt = self._build_stmt_for_ldda(stmt, data_conditions, used_ids, k, v) + stmt = self._build_stmt_for_ldda(stmt, data_conditions, used_ids, k, v, value_index=value_index) elif t == "hdca": - stmt = self._build_stmt_for_hdca(stmt, data_conditions, used_ids, k, v, user.id) + stmt = self._build_stmt_for_hdca( + stmt, data_conditions, used_ids, k, v, user.id, value_index=value_index + ) elif t == "dce": - stmt = self._build_stmt_for_dce(stmt, data_conditions, used_ids, k, v, user.id) + stmt = self._build_stmt_for_dce( + stmt, data_conditions, used_ids, k, v, user.id, value_index=value_index + ) else: log.error("Unknown input data type %s", t) return None @@ -670,13 +681,15 @@ def _exclude_jobs_with_deleted_outputs(self, stmt): ) return final_ordered_stmt - def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, require_name_match=True): + def _build_stmt_for_hda( + self, stmt, data_conditions, used_ids, k, v, identifier, value_index, require_name_match=True + ): a = aliased(model.JobToInputDatasetAssociation) b = aliased(model.HistoryDatasetAssociation) c = aliased(model.HistoryDatasetAssociation) d = aliased(model.JobParameter) e = aliased(model.HistoryDatasetAssociationHistory) - labeled_col = a.dataset_id.label(f"{k}_{v}") + labeled_col = a.dataset_id.label(f"{k}_{value_index}") stmt = stmt.add_columns(labeled_col) used_ids.append(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) @@ -725,16 +738,18 @@ def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, ) return stmt - def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v): + def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v, value_index): a = aliased(model.JobToInputLibraryDatasetAssociation) - labeled_col = a.ldda_id.label(f"{k}_{v}") + labeled_col = a.ldda_id.label(f"{k}_{value_index}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) data_conditions.append(and_(a.name == k, a.ldda_id == v)) used_ids.append(labeled_col) return stmt - def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, require_name_match=True): + def _build_stmt_for_hdca( + self, stmt, data_conditions, used_ids, k, v, user_id, value_index, require_name_match=True + ): # Strategy for efficiently finding equivalent HDCAs: # 1. Determine the structural depth of the target HDCA by its collection_type. # 2. For the target HDCA (identified by 'v'): @@ -763,9 +778,12 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r ) depth = collection_type.count(":") if collection_type else 0 - a = aliased(model.JobToInputDatasetCollectionAssociation, name=f"job_to_input_dataset_collection_1_{k}_{v}") + a = aliased( + model.JobToInputDatasetCollectionAssociation, name=f"job_to_input_dataset_collection_1_{k}_{value_index}" + ) hdca_input = aliased( - model.HistoryDatasetCollectionAssociation, name=f"history_dataset_collection_association_1_{k}_{v}" + model.HistoryDatasetCollectionAssociation, + name=f"history_dataset_collection_association_1_{k}_{value_index}", ) _hdca_target_cte_ref = aliased(model.HistoryDatasetCollectionAssociation, name="_hdca_target_cte_ref") @@ -798,7 +816,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r .where(_hdca_target_cte_ref.id == v) .distinct() ) - reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{v}") + reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{value_index}") # --- END NEW CTE --- # CTE 1: signature_elements_cte (for the reference HDCA) @@ -829,7 +847,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r _hda_cte_ref, _hda_cte_ref.id == _leaf_cte_ref.hda_id ) signature_elements_select = signature_elements_select.where(_hdca_target_cte_ref.id == v) - signature_elements_cte = signature_elements_select.cte(f"signature_elements_{k}_{v}") + signature_elements_cte = signature_elements_select.cte(f"signature_elements_{k}_{value_index}") # CTE 2: reference_full_signature_cte # This CTE aggregates the path signature strings of the reference HDCA into a @@ -843,7 +861,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r ).label("signature_array") ) .select_from(signature_elements_cte) - .cte(f"reference_full_signature_{k}_{v}") + .cte(f"reference_full_signature_{k}_{value_index}") ) candidate_hdca = aliased(model.HistoryDatasetCollectionAssociation, name="candidate_hdca") @@ -883,7 +901,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r .where(candidate_hda.dataset_id.in_(select(reference_all_dataset_ids_cte.c.ref_dataset_id_for_overlap))) ) candidate_hdca_pre_filter_ids_cte = candidate_hdca_pre_filter_ids_select.cte( - f"cand_hdca_pre_filter_ids_{k}_{v}" + f"cand_hdca_pre_filter_ids_{k}_{value_index}" ) # --- END NEW CTE --- @@ -919,7 +937,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id ) candidate_signature_elements_cte = candidate_signature_elements_select.cte( - f"candidate_signature_elements_{k}_{v}" + f"candidate_signature_elements_{k}_{value_index}" ) # CTE 4: candidate_full_signatures_cte @@ -936,7 +954,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r ) .select_from(candidate_signature_elements_cte) .group_by(candidate_signature_elements_cte.c.candidate_hdca_id) - .cte(f"candidate_full_signatures_{k}_{v}") + .cte(f"candidate_full_signatures_{k}_{value_index}") ) # CTE 5: equivalent_hdca_ids_cte @@ -948,13 +966,13 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r candidate_full_signatures_cte.c.full_signature_array == select(reference_full_signature_cte.c.signature_array).scalar_subquery() ) - .cte(f"equivalent_hdca_ids_{k}_{v}") + .cte(f"equivalent_hdca_ids_{k}_{value_index}") ) # Main query `stmt` construction # This section joins the base job statement with the associations and then filters # by the HDCAs identified as equivalent in the CTEs. - labeled_col = a.dataset_collection_id.label(f"{k}_{v}") + labeled_col = a.dataset_collection_id.label(f"{k}_{value_index}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) @@ -972,7 +990,7 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, r data_conditions.append(a.name == k) return stmt - def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): + def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id, value_index): dce_root_target = self.sa_session.get_one(model.DatasetCollectionElement, v) # Determine if the target DCE points to an HDA or a child collection @@ -987,15 +1005,18 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): depth = collection_type.count(":") if collection_type else 0 # Aliases for the target DCE's collection structure - _dce_target_root_ref = aliased(model.DatasetCollectionElement, name=f"_dce_target_root_ref_{k}_{v}") + _dce_target_root_ref = aliased( + model.DatasetCollectionElement, name=f"_dce_target_root_ref_{k}_{value_index}" + ) _dce_target_child_collection_ref = aliased( - model.DatasetCollection, name=f"_dce_target_child_collection_ref_{k}_{v}" + model.DatasetCollection, name=f"_dce_target_child_collection_ref_{k}_{value_index}" ) # List of aliases for each potential nested level of DatasetCollectionElements _dce_target_level_list = [ - aliased(model.DatasetCollectionElement, name=f"_dce_target_level_{k}_{v}_{i}") for i in range(depth + 1) + aliased(model.DatasetCollectionElement, name=f"_dce_target_level_{k}_{value_index}_{i}") + for i in range(depth + 1) ] - _hda_target_ref = aliased(model.HistoryDatasetAssociation, name=f"_hda_target_ref_{k}_{v}") + _hda_target_ref = aliased(model.HistoryDatasetAssociation, name=f"_hda_target_ref_{k}_{value_index}") # --- CTE: reference_dce_all_dataset_ids_cte --- # This CTE (Common Table Expression) identifies all distinct dataset IDs @@ -1028,7 +1049,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): .where(_dce_target_root_ref.id == v) .distinct() ) - reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{v}") + reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{value_index}") # --- CTE: reference_dce_signature_elements_cte --- # This CTE generates a "path signature string" for each individual element @@ -1069,7 +1090,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): _hda_target_ref, _hda_target_ref.id == _leaf_target_dce_ref.hda_id ).where(_dce_target_root_ref.id == v) reference_dce_signature_elements_cte = reference_dce_signature_elements_select.cte( - f"ref_dce_sig_els_{k}_{v}" + f"ref_dce_sig_els_{k}_{value_index}" ) # --- CTE: reference_full_signature_cte --- @@ -1093,7 +1114,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): ), # Count elements based on path_signature_string ) .select_from(reference_dce_signature_elements_cte) - .cte(f"ref_dce_full_sig_{k}_{v}") + .cte(f"ref_dce_full_sig_{k}_{value_index}") ) # --- Aliases for Candidate Dataset Collection Structure --- @@ -1101,14 +1122,14 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): # in the database, which will be compared against the reference. candidate_dce_root = aliased(model.DatasetCollectionElement, name=f"candidate_dce_root_{k}_{v}") candidate_dce_child_collection = aliased( - model.DatasetCollection, name=f"candidate_dce_child_collection_{k}_{v}" + model.DatasetCollection, name=f"candidate_dce_child_collection_{k}_{value_index}" ) candidate_dce_level_list = [ - aliased(model.DatasetCollectionElement, name=f"candidate_dce_level_{k}_{v}_{i}") + aliased(model.DatasetCollectionElement, name=f"candidate_dce_level_{k}_{value_index}_{i}") for i in range(depth + 1) ] - candidate_hda = aliased(model.HistoryDatasetAssociation, name=f"candidate_hda_{k}_{v}") - candidate_history = aliased(model.History, name=f"candidate_history_{k}_{v}") + candidate_hda = aliased(model.HistoryDatasetAssociation, name=f"candidate_hda_{k}_{value_index}") + candidate_history = aliased(model.History, name=f"candidate_history_{k}_{value_index}") # --- CTE: candidate_dce_pre_filter_ids_cte (Initial Candidate Filtering) --- # This CTE performs a first pass to quickly narrow down potential candidate @@ -1146,7 +1167,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): .where(candidate_hda.dataset_id.in_(select(reference_all_dataset_ids_cte.c.ref_dataset_id_for_overlap))) ) candidate_dce_pre_filter_ids_cte = candidate_dce_pre_filter_ids_select.cte( - f"cand_dce_pre_filter_ids_{k}_{v}" + f"cand_dce_pre_filter_ids_{k}_{value_index}" ) # --- CTE: candidate_dce_signature_elements_cte --- @@ -1195,7 +1216,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): .where(or_(candidate_history.published == true(), candidate_history.user_id == user_id)) ) candidate_dce_signature_elements_cte = candidate_dce_signature_elements_select.cte( - f"cand_dce_sig_els_{k}_{v}" + f"cand_dce_sig_els_{k}_{value_index}" ) # --- CTE: candidate_pre_signatures_cte (Candidate Aggregation for Comparison) --- @@ -1218,7 +1239,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): ) .select_from(candidate_dce_signature_elements_cte) .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id) - .cte(f"cand_dce_pre_sig_{k}_{v}") + .cte(f"cand_dce_pre_sig_{k}_{value_index}") ) # --- CTE: filtered_cand_dce_by_dataset_ids_cte (Filtering by Element Count and Dataset ID Array) --- @@ -1238,7 +1259,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): == reference_full_signature_cte.c.ordered_dataset_id_array, ) ) - .cte(f"filtered_cand_dce_{k}_{v}") + .cte(f"filtered_cand_dce_{k}_{value_index}") ) # --- CTE: final_candidate_signatures_cte (Final Full Signature Calculation for Matched Candidates) --- @@ -1261,16 +1282,17 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): ) ) .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id) - .cte(f"final_cand_dce_full_sig_{k}_{v}") + .cte(f"final_cand_dce_full_sig_{k}_{value_index}") ) # --- Main Query Construction for Dataset Collection Elements --- # This section joins the main `stmt` (representing jobs) with the CTEs # to filter jobs whose input DCE matches the reference DCE's full signature. a = aliased( - model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{v}" + model.JobToInputDatasetCollectionElementAssociation, + name=f"job_to_input_dce_association_{k}_{value_index}", ) - labeled_col = a.dataset_collection_element_id.label(f"{k}_{v}") + labeled_col = a.dataset_collection_element_id.label(f"{k}_{value_index}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) @@ -1302,17 +1324,18 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id): # Aliases for the "left" side (job to input DCE path) a = aliased( - model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{v}" + model.JobToInputDatasetCollectionElementAssociation, + name=f"job_to_input_dce_association_{k}_{value_index}", ) - dce_left = aliased(model.DatasetCollectionElement, name=f"dce_left_{k}_{v}") - hda_left = aliased(model.HistoryDatasetAssociation, name=f"hda_left_{k}_{v}") + dce_left = aliased(model.DatasetCollectionElement, name=f"dce_left_{k}_{value_index}") + hda_left = aliased(model.HistoryDatasetAssociation, name=f"hda_left_{k}_{value_index}") # Aliases for the "right" side (target DCE path in the main query) - dce_right = aliased(model.DatasetCollectionElement, name=f"dce_right_{k}_{v}") - hda_right = aliased(model.HistoryDatasetAssociation, name=f"hda_right_{k}_{v}") + dce_right = aliased(model.DatasetCollectionElement, name=f"dce_right_{k}_{value_index}") + hda_right = aliased(model.HistoryDatasetAssociation, name=f"hda_right_{k}_{value_index}") # Start joins from job → input DCE association → first-level DCE (left side) - labeled_col = a.dataset_collection_element_id.label(f"{k}_{v}") + labeled_col = a.dataset_collection_element_id.label(f"{k}_{value_index}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) stmt = stmt.join(dce_left, dce_left.id == a.dataset_collection_element_id) diff --git a/lib/galaxy_test/api/test_tools.py b/lib/galaxy_test/api/test_tools.py index 6e47c5e14271..09a0d812bbfd 100644 --- a/lib/galaxy_test/api/test_tools.py +++ b/lib/galaxy_test/api/test_tools.py @@ -1097,6 +1097,23 @@ def test_run_cat1_use_cached_job_renamed_input(self): job_details = self.dataset_populator.get_job_details(copied_job_id, full=True).json() assert job_details["copied_from_job_id"] == outputs_one["jobs"][0]["id"] + @skip_without_tool("cat_list") + @requires_new_history + def test_run_cat_list_use_cached_job_repeated_input(self): + with self.dataset_populator.test_history_for( + self.test_run_cat_list_use_cached_job_repeated_input + ) as history_id: + # Run simple non-upload tool with an input data parameter. + input_value = dataset_to_param(self.dataset_populator.new_dataset(history_id=history_id)) + inputs = {"input1": {"batch": False, "values": [input_value, input_value]}} + outputs_one = self._run("cat_list", history_id, inputs, assert_ok=True, wait_for_job=True) + outputs_two = self._run( + "cat_list", history_id, inputs, assert_ok=True, wait_for_job=True, use_cached_job=True + ) + copied_job_id = outputs_two["jobs"][0]["id"] + job_details = self.dataset_populator.get_job_details(copied_job_id, full=True).json() + assert job_details["copied_from_job_id"] == outputs_one["jobs"][0]["id"] + @skip_without_tool("collection_creates_list") @requires_new_history def test_run_collection_creates_list_use_cached_job(self): From 2d98fbf25fed40b11e0e3ea16e0f049a8d075849 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 2 Jun 2025 09:02:51 +0200 Subject: [PATCH 17/21] Limit materialized hint to postgresql --- lib/galaxy/managers/jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 89980d744fee..151182f90d8e 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -571,7 +571,7 @@ def _filter_jobs( self, stmt, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump ): """Build subquery that selects a job with correct job parameters.""" - job_ids_materialized_cte = stmt.cte("job_ids_cte").prefix_with("MATERIALIZED") + job_ids_materialized_cte = stmt.cte("job_ids_cte").prefix_with("MATERIALIZED", dialect="postgresql") outer_select_columns = [job_ids_materialized_cte.c[col.name] for col in stmt.selected_columns] stmt = select(*outer_select_columns).select_from(job_ids_materialized_cte) stmt = ( @@ -673,7 +673,7 @@ def _exclude_jobs_with_deleted_outputs(self, stmt): ~deleted_dataset_exists, # NOT EXISTS deleted dataset ) ) - unordered_results_cte = outer_stmt.cte("unordered_results").prefix_with("MATERIALIZED") + unordered_results_cte = outer_stmt.cte("unordered_results").prefix_with("MATERIALIZED", dialect="postgresql") final_ordered_stmt = ( select(*unordered_results_cte.c) .select_from(unordered_results_cte) From 615d1c79ae09dc94ee077c0a3b023b3937af2e80 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 2 Jun 2025 10:21:24 +0200 Subject: [PATCH 18/21] Make search work on sqlite and postgres --- lib/galaxy/managers/jobs.py | 56 +++++++++++++++---------------------- 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 151182f90d8e..03e79fbc89d5 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -366,6 +366,7 @@ def __init__( id_encoding_helper: IdEncodingHelper, ): self.sa_session = sa_session + self.dialect_name = sa_session.get_bind().dialect.name self.hda_manager = hda_manager self.dataset_collection_manager = dataset_collection_manager self.ldda_manager = ldda_manager @@ -747,6 +748,12 @@ def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v, value_inde used_ids.append(labeled_col) return stmt + def agg_expression(self, column): + if self.dialect_name == "sqlite": + return func.group_concat(column) + else: + return func.array_agg(column, order_by=column) + def _build_stmt_for_hdca( self, stmt, data_conditions, used_ids, k, v, user_id, value_index, require_name_match=True ): @@ -853,13 +860,7 @@ def _build_stmt_for_hdca( # This CTE aggregates the path signature strings of the reference HDCA into a # canonical, sorted array. This array represents the complete "signature" of the collection. reference_full_signature_cte = ( - select( - func.array_agg( - sqlalchemy.text( - f"{signature_elements_cte.c.path_signature_string.name} ORDER BY {signature_elements_cte.c.path_signature_string.name}" - ) - ).label("signature_array") - ) + select(self.agg_expression(signature_elements_cte.c.path_signature_string).label("signature_array")) .select_from(signature_elements_cte) .cte(f"reference_full_signature_{k}_{value_index}") ) @@ -946,11 +947,9 @@ def _build_stmt_for_hdca( candidate_full_signatures_cte = ( select( candidate_signature_elements_cte.c.candidate_hdca_id, - func.array_agg( - sqlalchemy.text( - f"{candidate_signature_elements_cte.c.path_signature_string.name} ORDER BY {candidate_signature_elements_cte.c.path_signature_string.name}" - ) - ).label("full_signature_array"), + self.agg_expression(candidate_signature_elements_cte.c.path_signature_string).label( + "full_signature_array" + ), ) .select_from(candidate_signature_elements_cte) .group_by(candidate_signature_elements_cte.c.candidate_hdca_id) @@ -1099,16 +1098,12 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id, va # used for direct comparison with candidate collections. reference_full_signature_cte = ( select( - func.array_agg( - reference_dce_signature_elements_cte.c.path_signature_string, - order_by=reference_dce_signature_elements_cte.c.path_signature_string, - ).label("signature_array"), - func.array_agg( - reference_dce_signature_elements_cte.c.raw_dataset_id_for_ordering.cast( - sqlalchemy.Integer - ), # Cast to Integer here - order_by=reference_dce_signature_elements_cte.c.path_signature_string, # Order by full path to ensure consistency - ).label("ordered_dataset_id_array"), + self.agg_expression(reference_dce_signature_elements_cte.c.path_signature_string).label( + "signature_array" + ), + self.agg_expression(reference_dce_signature_elements_cte.c.raw_dataset_id_for_ordering).label( + "ordered_dataset_id_array" + ), func.count(reference_dce_signature_elements_cte.c.path_signature_string).label( "element_count" ), # Count elements based on path_signature_string @@ -1227,12 +1222,9 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id, va select( candidate_dce_signature_elements_cte.c.candidate_dce_id, # Corrected array_agg syntax: pass column directly, use order_by keyword - func.array_agg( - candidate_dce_signature_elements_cte.c.dataset_id_for_ordered_array.cast( - sqlalchemy.Integer - ), # Cast explicitly - order_by=candidate_dce_signature_elements_cte.c.path_signature_string, # Order by the full path - ).label("candidate_ordered_dataset_ids_array"), + self.agg_expression(candidate_dce_signature_elements_cte.c.dataset_id_for_ordered_array).label( + "candidate_ordered_dataset_ids_array" + ), func.count(candidate_dce_signature_elements_cte.c.candidate_dce_id).label( "candidate_element_count" ), @@ -1269,11 +1261,9 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id, va final_candidate_signatures_cte = ( select( candidate_dce_signature_elements_cte.c.candidate_dce_id, - func.array_agg( - sqlalchemy.text( - f"{candidate_dce_signature_elements_cte.c.path_signature_string} ORDER BY {candidate_dce_signature_elements_cte.c.path_signature_string.name}" - ) - ).label("full_signature_array"), + self.agg_expression(candidate_dce_signature_elements_cte.c.path_signature_string).label( + "full_signature_array" + ), ) .select_from(candidate_dce_signature_elements_cte) .where( From 829f566dae8af59cc18ca5e1606ff2d46842f93f Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 2 Jun 2025 12:42:34 +0200 Subject: [PATCH 19/21] Drop unnecessary materialize hint --- lib/galaxy/managers/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 03e79fbc89d5..fc22f2e28053 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -572,7 +572,7 @@ def _filter_jobs( self, stmt, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump ): """Build subquery that selects a job with correct job parameters.""" - job_ids_materialized_cte = stmt.cte("job_ids_cte").prefix_with("MATERIALIZED", dialect="postgresql") + job_ids_materialized_cte = stmt.cte("job_ids_cte") outer_select_columns = [job_ids_materialized_cte.c[col.name] for col in stmt.selected_columns] stmt = select(*outer_select_columns).select_from(job_ids_materialized_cte) stmt = ( From 7fb58fd4ccc41894d27cb721fd932d5cc15699c7 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 2 Jun 2025 13:14:39 +0200 Subject: [PATCH 20/21] Restore compatibility with postgresql < 12 --- lib/galaxy/managers/jobs.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index fc22f2e28053..2ed463a9c5b8 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -367,11 +367,28 @@ def __init__( ): self.sa_session = sa_session self.dialect_name = sa_session.get_bind().dialect.name + self.use_materialized_hint = self.supports_materialized_hint() self.hda_manager = hda_manager self.dataset_collection_manager = dataset_collection_manager self.ldda_manager = ldda_manager self.decode_id = id_encoding_helper.decode_id + def supports_materialized_hint(self) -> bool: + """ + Checks if the connected PostgreSQL database version supports the MATERIALIZED hint. + (PostgreSQL 12 and higher support it). + """ + # session.bind refers to the Engine or Connection the session is bound to + # dialect provides information about the database being used + # server_version_info returns a tuple (major, minor, micro, ...) + # e.g., (12, 5) for PostgreSQL 12.5, (13, 2) for PostgreSQL 13.2 + if self.dialect_name == "postgresql": + bind = self.sa_session.get_bind() + server_version_info = bind.dialect and bind.dialect.server_version_info + if server_version_info: + return server_version_info[0] >= 12 + return False + def by_tool_input( self, user: User, @@ -674,7 +691,11 @@ def _exclude_jobs_with_deleted_outputs(self, stmt): ~deleted_dataset_exists, # NOT EXISTS deleted dataset ) ) - unordered_results_cte = outer_stmt.cte("unordered_results").prefix_with("MATERIALIZED", dialect="postgresql") + unordered_results_cte = outer_stmt.cte("unordered_results") + if self.use_materialized_hint: + # This can be considerable faster with large job tables, + # but is only available on postgresql >= 12. + unordered_results_cte = unordered_results_cte.prefix_with("MATERIALIZED") final_ordered_stmt = ( select(*unordered_results_cte.c) .select_from(unordered_results_cte) From 32914f2e07d3b69ab6e31350180869c34cf908a6 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 11 Jun 2025 20:47:43 +0200 Subject: [PATCH 21/21] Add test that confirms element sorting must match --- lib/galaxy_test/api/test_tools.py | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/lib/galaxy_test/api/test_tools.py b/lib/galaxy_test/api/test_tools.py index 09a0d812bbfd..60cd250a56a3 100644 --- a/lib/galaxy_test/api/test_tools.py +++ b/lib/galaxy_test/api/test_tools.py @@ -1056,6 +1056,41 @@ def test_run_cat1_use_cached_job(self): assert len(filenames) == 3, filenames assert len(set(filenames)) <= 2, filenames + @skip_without_tool("cat_list") + @skip_without_tool("__SORTLIST__") + def test_run_cat_list_hdca_sort_order_respecrted_use_cached_job(self): + with self.dataset_populator.test_history_for( + self.test_run_cat_list_hdca_sort_order_respecrted_use_cached_job + ) as history_id: + fetch_response = self.dataset_collection_populator.create_list_in_history( + history_id, wait=True, contents=[("C", "3"), ("B", "2"), ("A", "1")] + ).json() + hdca_not_sorted_id = fetch_response["output_collections"][0]["id"] + result = self._run( + tool_id="__SORTLIST__", + history_id=history_id, + inputs={"input": {"src": "hdca", "id": hdca_not_sorted_id}}, + assert_ok=True, + ) + hdca_sorted_id = result["output_collections"][0]["id"] + self.dataset_populator.get_history_collection_details(history_id, content_id=hdca_sorted_id) + hdca_sorted = self.dataset_populator.get_history_collection_details(history_id, content_id=hdca_sorted_id) + hdca_not_sorted = self.dataset_populator.get_history_collection_details( + history_id, content_id=hdca_not_sorted_id + ) + assert hdca_sorted["elements"][0]["object"]["name"] == "A" + assert hdca_not_sorted["elements"][0]["object"]["name"] == "C" + self._run("cat_list", history_id, inputs={"input1": {"src": "hdca", "id": hdca_sorted_id}}, assert_ok=True) + job = self._run( + "cat_list", + history_id, + inputs={"input1": {"src": "hdca", "id": hdca_not_sorted_id}}, + assert_ok=True, + use_cached_job=True, + ) + job_details = self.dataset_populator.get_job_details(job["jobs"][0]["id"], full=True).json() + assert not job_details["copied_from_job_id"] + @skip_without_tool("cat1") @requires_new_history def test_run_cat1_use_cached_job_from_public_history(self):