diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index b173bbcd14ec..2ed463a9c5b8 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -24,6 +24,7 @@ ) from sqlalchemy import ( and_, + exists, false, func, null, @@ -365,11 +366,29 @@ def __init__( id_encoding_helper: IdEncodingHelper, ): self.sa_session = sa_session + self.dialect_name = sa_session.get_bind().dialect.name + self.use_materialized_hint = self.supports_materialized_hint() self.hda_manager = hda_manager self.dataset_collection_manager = dataset_collection_manager self.ldda_manager = ldda_manager self.decode_id = id_encoding_helper.decode_id + def supports_materialized_hint(self) -> bool: + """ + Checks if the connected PostgreSQL database version supports the MATERIALIZED hint. + (PostgreSQL 12 and higher support it). + """ + # session.bind refers to the Engine or Connection the session is bound to + # dialect provides information about the database being used + # server_version_info returns a tuple (major, minor, micro, ...) + # e.g., (12, 5) for PostgreSQL 12.5, (13, 2) for PostgreSQL 13.2 + if self.dialect_name == "postgresql": + bind = self.sa_session.get_bind() + server_version_info = bind.dialect and bind.dialect.server_version_info + if server_version_info: + return server_version_info[0] >= 12 + return False + def by_tool_input( self, user: User, @@ -453,7 +472,7 @@ def replace_dataset_ids(path, key, value): return key, value return key, value - stmt = self._build_job_query(tool_id, user.id, tool_version, job_state, wildcard_param_dump) + stmt = select(model.Job.id.label("job_id")) data_conditions: List = [] @@ -465,10 +484,9 @@ def replace_dataset_ids(path, key, value): data_types = [] used_ids: List = [] for k, input_list in input_data.items(): - # k will be matched against the JobParameter.name column. This can be prefixed depending on whethter + # k will be matched against the JobParameter.name column. This can be prefixed depending on whether # the input is in a repeat, or not (section and conditional) - k = {k, k.split("|")[-1]} - for type_values in input_list: + for value_index, type_values in enumerate(input_list): t = type_values["src"] v = type_values["id"] requested_ids.append(v) @@ -476,19 +494,32 @@ def replace_dataset_ids(path, key, value): identifier = type_values["identifier"] if t == "hda": stmt = self._build_stmt_for_hda( - stmt, data_conditions, used_ids, k, v, identifier, require_name_match=require_name_match + stmt, + data_conditions, + used_ids, + k, + v, + identifier, + require_name_match=require_name_match, + value_index=value_index, ) elif t == "ldda": - stmt = self._build_stmt_for_ldda(stmt, data_conditions, used_ids, k, v) + stmt = self._build_stmt_for_ldda(stmt, data_conditions, used_ids, k, v, value_index=value_index) elif t == "hdca": - stmt = self._build_stmt_for_hdca(stmt, data_conditions, used_ids, k, v) + stmt = self._build_stmt_for_hdca( + stmt, data_conditions, used_ids, k, v, user.id, value_index=value_index + ) elif t == "dce": - stmt = self._build_stmt_for_dce(stmt, data_conditions, used_ids, k, v) + stmt = self._build_stmt_for_dce( + stmt, data_conditions, used_ids, k, v, user.id, value_index=value_index + ) else: log.error("Unknown input data type %s", t) return None - stmt = stmt.where(*data_conditions).group_by(model.Job.id, *used_ids).order_by(model.Job.id.desc()) + stmt = stmt.where(*data_conditions).group_by(model.Job.id, *used_ids) + stmt = self._filter_jobs(stmt, tool_id, user.id, tool_version, job_state, wildcard_param_dump) + stmt = self._exclude_jobs_with_deleted_outputs(stmt) for job in self.sa_session.execute(stmt): # We found a job that is equal in terms of tool_id, user, state and input datasets, @@ -554,10 +585,15 @@ def replace_dataset_ids(path, key, value): log.info("No equivalent jobs found %s", search_timer) return None - def _build_job_query(self, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump): + def _filter_jobs( + self, stmt, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump + ): """Build subquery that selects a job with correct job parameters.""" + job_ids_materialized_cte = stmt.cte("job_ids_cte") + outer_select_columns = [job_ids_materialized_cte.c[col.name] for col in stmt.selected_columns] + stmt = select(*outer_select_columns).select_from(job_ids_materialized_cte) stmt = ( - select(model.Job.id) + stmt.join(model.Job, model.Job.id == job_ids_materialized_cte.c.job_id) .join(model.History, model.Job.history_id == model.History.id) .where( and_( @@ -590,14 +626,6 @@ def _build_job_query(self, tool_id: str, user_id: int, tool_version: Optional[st job_states = {Job.states.SKIPPED} stmt = stmt.where(Job.state.in_(job_states)) - # exclude jobs with deleted outputs - stmt = stmt.where( - and_( - model.Job.any_output_dataset_collection_instances_deleted == false(), - model.Job.any_output_dataset_deleted == false(), - ) - ) - for k, v in wildcard_param_dump.items(): if v == {"__class__": "RuntimeValue"}: # TODO: verify this is always None. e.g. run with runtime input input @@ -633,46 +661,85 @@ def _build_job_query(self, tool_id: str, user_id: int, tool_version: Optional[st return stmt - def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, require_name_match=True): + def _exclude_jobs_with_deleted_outputs(self, stmt): + subquery_alias = stmt.subquery("filtered_jobs_subquery") + outer_select_columns = [subquery_alias.c[col.name] for col in stmt.selected_columns] + outer_stmt = select(*outer_select_columns).select_from(subquery_alias) + job_id_from_subquery = subquery_alias.c.job_id + deleted_collection_exists = exists().where( + and_( + model.JobToOutputDatasetCollectionAssociation.job_id == job_id_from_subquery, + model.JobToOutputDatasetCollectionAssociation.dataset_collection_id + == model.HistoryDatasetCollectionAssociation.id, + model.HistoryDatasetCollectionAssociation.deleted == true(), + ) + ) + + # Subquery for deleted output datasets + deleted_dataset_exists = exists().where( + and_( + model.JobToOutputDatasetAssociation.job_id == job_id_from_subquery, + model.JobToOutputDatasetAssociation.dataset_id == model.HistoryDatasetAssociation.id, + model.HistoryDatasetAssociation.deleted == true(), + ) + ) + + # Exclude jobs where a deleted collection OR a deleted dataset exists + outer_stmt = outer_stmt.where( + and_( + ~deleted_collection_exists, # NOT EXISTS deleted collection + ~deleted_dataset_exists, # NOT EXISTS deleted dataset + ) + ) + unordered_results_cte = outer_stmt.cte("unordered_results") + if self.use_materialized_hint: + # This can be considerable faster with large job tables, + # but is only available on postgresql >= 12. + unordered_results_cte = unordered_results_cte.prefix_with("MATERIALIZED") + final_ordered_stmt = ( + select(*unordered_results_cte.c) + .select_from(unordered_results_cte) + .order_by(unordered_results_cte.c.job_id.desc()) + ) + return final_ordered_stmt + + def _build_stmt_for_hda( + self, stmt, data_conditions, used_ids, k, v, identifier, value_index, require_name_match=True + ): a = aliased(model.JobToInputDatasetAssociation) b = aliased(model.HistoryDatasetAssociation) c = aliased(model.HistoryDatasetAssociation) d = aliased(model.JobParameter) e = aliased(model.HistoryDatasetAssociationHistory) - stmt = stmt.add_columns(a.dataset_id) - used_ids.append(a.dataset_id) + labeled_col = a.dataset_id.label(f"{k}_{value_index}") + stmt = stmt.add_columns(labeled_col) + used_ids.append(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) - hda_stmt = select(model.HistoryDatasetAssociation.id).where( - model.HistoryDatasetAssociation.id == e.history_dataset_association_id - ) # b is the HDA used for the job stmt = stmt.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id) name_condition = [] + hda_history_join_conditions = [ + e.history_dataset_association_id == b.id, + e.extension == c.extension, + e._metadata == c._metadata, + e.version == a.dataset_version, + ] if identifier: stmt = stmt.join(d) data_conditions.append( and_( - d.name.in_({f"{_}|__identifier__" for _ in k}), + d.name == f"{k}|__identifier__", d.value == json.dumps(identifier), ) ) elif require_name_match: - hda_stmt = hda_stmt.where(e.name == c.name) + hda_history_join_conditions.append(e.name == c.name) name_condition.append(b.name == c.name) - hda_stmt = ( - hda_stmt.where( - e.extension == c.extension, - ) - .where( - a.dataset_version == e.version, - ) - .where( - e._metadata == c._metadata, - ) - ) + + stmt = stmt.outerjoin(e, and_(*hda_history_join_conditions)) data_conditions.append( and_( - a.name.in_(k), + a.name == k, c.id == v, # c is the requested job input HDA # We need to make sure that the job we are looking for has been run with identical inputs. # Here we deal with 3 requirements: @@ -686,23 +753,51 @@ def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, b.metadata == c.metadata, *name_condition, ), - b.id.in_(hda_stmt), + e.history_dataset_association_id.isnot(None), ), or_(b.deleted == false(), c.deleted == false()), ) ) return stmt - def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v): + def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v, value_index): a = aliased(model.JobToInputLibraryDatasetAssociation) - stmt = stmt.add_columns(a.ldda_id) + labeled_col = a.ldda_id.label(f"{k}_{value_index}") + stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) - data_conditions.append(and_(a.name.in_(k), a.ldda_id == v)) - used_ids.append(a.ldda_id) + data_conditions.append(and_(a.name == k, a.ldda_id == v)) + used_ids.append(labeled_col) return stmt - def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_name_match=True): - # Determine depth based on collection_type of the HDCA we're matching against + def agg_expression(self, column): + if self.dialect_name == "sqlite": + return func.group_concat(column) + else: + return func.array_agg(column, order_by=column) + + def _build_stmt_for_hdca( + self, stmt, data_conditions, used_ids, k, v, user_id, value_index, require_name_match=True + ): + # Strategy for efficiently finding equivalent HDCAs: + # 1. Determine the structural depth of the target HDCA by its collection_type. + # 2. For the target HDCA (identified by 'v'): + # a. Dynamically construct Common Table Expressions (CTEs) to traverse its (potentially nested) structure down to individual datasets. + # b. Generate a "path signature string" for each dataset element, uniquely identifying its path within the collection. + # c. Aggregate these path strings into a canonical, sorted array (the "reference full signature") using array_agg with explicit ordering. + # 3. For all candidate HDCAs: + # a. Perform a similar dynamic traversal and path signature string generation. + # b. Aggregate these into sorted "full signature" arrays for each candidate HDCA. + # 4. Finally, identify equivalent HDCAs by comparing their full signature array directly against the reference full signature array. + # + # This approach is performant because: + # - It translates the complex problem of structural collection comparison into efficient array equality checks directly within the database. + # - It leverages the power of SQL CTEs and set-based operations, allowing the database query optimizer to find an efficient execution plan. + # - Joins required to traverse collection structures are built dynamically based on the actual depth, avoiding unnecessary complexity. + # - Signatures are computed and compared entirely on the database side, minimizing data transfer to the application. + # + # Note: CTEs are uniquely named using 'k' and 'v' to allow this logic to be embedded + # within larger queries or loops processing multiple target HDCAs. Aliases are used + # extensively to manage dynamic joins based on collection depth. collection_type = self.sa_session.scalar( select(model.DatasetCollection.collection_type) .select_from(model.HistoryDatasetCollectionAssociation) @@ -711,90 +806,567 @@ def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, require_na ) depth = collection_type.count(":") if collection_type else 0 - a = aliased(model.JobToInputDatasetCollectionAssociation) - hdca_input = aliased(model.HistoryDatasetCollectionAssociation) - root_collection = aliased(model.DatasetCollection) + a = aliased( + model.JobToInputDatasetCollectionAssociation, name=f"job_to_input_dataset_collection_1_{k}_{value_index}" + ) + hdca_input = aliased( + model.HistoryDatasetCollectionAssociation, + name=f"history_dataset_collection_association_1_{k}_{value_index}", + ) - dce_left = [aliased(model.DatasetCollectionElement) for _ in range(depth + 1)] - dce_right = [aliased(model.DatasetCollectionElement) for _ in range(depth + 1)] - hda_left = aliased(model.HistoryDatasetAssociation) - hda_right = aliased(model.HistoryDatasetAssociation) + _hdca_target_cte_ref = aliased(model.HistoryDatasetCollectionAssociation, name="_hdca_target_cte_ref") + _target_collection_cte_ref = aliased(model.DatasetCollection, name="_target_collection_cte_ref") + _dce_cte_ref_list = [ + aliased(model.DatasetCollectionElement, name=f"_dce_cte_ref_{i}") for i in range(depth + 1) + ] + _hda_cte_ref = aliased(model.HistoryDatasetAssociation, name="_hda_cte_ref") + + # --- NEW CTE: reference_hdca_all_dataset_ids_cte --- + # This CTE identifies all distinct dataset IDs that are part of the *reference* + # History Dataset Collection Association (HDCA). This is used for an initial, + # fast pre-filtering of candidate HDCAs. + reference_all_dataset_ids_select = ( + select(_hda_cte_ref.dataset_id.label("ref_dataset_id_for_overlap")) + .select_from(_hdca_target_cte_ref) + .join(_target_collection_cte_ref, _target_collection_cte_ref.id == _hdca_target_cte_ref.collection_id) + .join(_dce_cte_ref_list[0], _dce_cte_ref_list[0].dataset_collection_id == _target_collection_cte_ref.id) + ) - # Start joins from job → input HDCA → its collection → DCE - stmt = stmt.add_columns(a.dataset_collection_id) - stmt = stmt.join(a, a.job_id == model.Job.id) - stmt = stmt.join(hdca_input, hdca_input.id == a.dataset_collection_id) - stmt = stmt.join(root_collection, root_collection.id == hdca_input.collection_id) - stmt = stmt.join(dce_left[0], dce_left[0].dataset_collection_id == root_collection.id) - - # Join to target HDCA (v), then to its collection and its first-level DCE - hdca_target = aliased(model.HistoryDatasetCollectionAssociation) - target_collection = aliased(model.DatasetCollection) - stmt = stmt.join(hdca_target, hdca_target.id == v) - stmt = stmt.join(target_collection, target_collection.id == hdca_target.collection_id) - stmt = stmt.join(dce_right[0], dce_right[0].dataset_collection_id == target_collection.id) - - # Parallel walk the structure for i in range(1, depth + 1): - stmt = ( - stmt.join(dce_left[i], dce_left[i].dataset_collection_id == dce_left[i - 1].child_collection_id) - .join(dce_right[i], dce_right[i].dataset_collection_id == dce_right[i - 1].child_collection_id) - .filter(dce_left[i].element_identifier == dce_right[i].element_identifier) + reference_all_dataset_ids_select = reference_all_dataset_ids_select.join( + _dce_cte_ref_list[i], + _dce_cte_ref_list[i].dataset_collection_id == _dce_cte_ref_list[i - 1].child_collection_id, ) - # Compare leaf-level HDAs - leaf_left = dce_left[-1] - leaf_right = dce_right[-1] - stmt = stmt.outerjoin(hda_left, hda_left.id == leaf_left.hda_id) - stmt = stmt.outerjoin(hda_right, hda_right.id == leaf_right.hda_id) + _leaf_cte_ref = _dce_cte_ref_list[-1] + reference_all_dataset_ids_select = ( + reference_all_dataset_ids_select.join(_hda_cte_ref, _hda_cte_ref.id == _leaf_cte_ref.hda_id) + .where(_hdca_target_cte_ref.id == v) + .distinct() + ) + reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{value_index}") + # --- END NEW CTE --- + + # CTE 1: signature_elements_cte (for the reference HDCA) + # This CTE generates a unique "path signature string" for each dataset element + # within the reference HDCA. This string identifies the element's position + # and content within the nested collection structure. + signature_elements_select = ( + select( + func.concat_ws( + ";", + *[_dce_cte_ref_list[i].element_identifier for i in range(depth + 1)], + _hda_cte_ref.dataset_id.cast(sqlalchemy.Text), + ).label("path_signature_string") + ) + .select_from(_hdca_target_cte_ref) + .join(_target_collection_cte_ref, _target_collection_cte_ref.id == _hdca_target_cte_ref.collection_id) + .join(_dce_cte_ref_list[0], _dce_cte_ref_list[0].dataset_collection_id == _target_collection_cte_ref.id) + ) - data_conditions.append( - and_( - a.name.in_(k), - hda_left.dataset_id == hda_right.dataset_id, + for i in range(1, depth + 1): + signature_elements_select = signature_elements_select.join( + _dce_cte_ref_list[i], + _dce_cte_ref_list[i].dataset_collection_id == _dce_cte_ref_list[i - 1].child_collection_id, ) + + _leaf_cte_ref = _dce_cte_ref_list[-1] + signature_elements_select = signature_elements_select.join( + _hda_cte_ref, _hda_cte_ref.id == _leaf_cte_ref.hda_id + ) + signature_elements_select = signature_elements_select.where(_hdca_target_cte_ref.id == v) + signature_elements_cte = signature_elements_select.cte(f"signature_elements_{k}_{value_index}") + + # CTE 2: reference_full_signature_cte + # This CTE aggregates the path signature strings of the reference HDCA into a + # canonical, sorted array. This array represents the complete "signature" of the collection. + reference_full_signature_cte = ( + select(self.agg_expression(signature_elements_cte.c.path_signature_string).label("signature_array")) + .select_from(signature_elements_cte) + .cte(f"reference_full_signature_{k}_{value_index}") ) - used_ids.append(a.dataset_collection_id) # input-side HDCA - return stmt + candidate_hdca = aliased(model.HistoryDatasetCollectionAssociation, name="candidate_hdca") + candidate_hdca_history = aliased(model.History, name="candidate_hdca_history") + candidate_root_collection = aliased(model.DatasetCollection, name="candidate_root_collection") + candidate_dce_list = [ + aliased(model.DatasetCollectionElement, name=f"candidate_dce_{i}") for i in range(depth + 1) + ] + candidate_hda = aliased(model.HistoryDatasetAssociation, name="candidate_hda") + + # --- NEW CTE: candidate_hdca_pre_filter_ids_cte (First Pass Candidate Filtering) --- + # This CTE performs a quick initial filter on candidate HDCAs. + # It checks for: + # 1. User permissions (published or owned by the current user). + # 2. Whether the candidate HDCA contains any dataset IDs that are also present + # in the reference HDCA (an overlap check). This is a broad filter to + # reduce the number of candidates before more expensive signature generation. + candidate_hdca_pre_filter_ids_select = ( + select(candidate_hdca.id.label("candidate_hdca_id")) + .distinct() + .select_from(candidate_hdca) + .join(candidate_hdca_history, candidate_hdca_history.id == candidate_hdca.history_id) + .join(candidate_root_collection, candidate_root_collection.id == candidate_hdca.collection_id) + .join(candidate_dce_list[0], candidate_dce_list[0].dataset_collection_id == candidate_root_collection.id) + ) - def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): - dce = self.sa_session.get_one(model.DatasetCollectionElement, v) - if dce.child_collection: - depth = dce.child_collection.collection_type.count(":") + 1 - else: - depth = 0 - a = aliased(model.JobToInputDatasetCollectionElementAssociation) - dce_left = [aliased(model.DatasetCollectionElement) for _ in range(depth + 1)] - dce_right = [aliased(model.DatasetCollectionElement) for _ in range(depth + 1)] - hda_left = aliased(model.HistoryDatasetAssociation) - hda_right = aliased(model.HistoryDatasetAssociation) - - # Base joins - stmt = stmt.add_columns(a.dataset_collection_element_id) - stmt = stmt.join(a, a.job_id == model.Job.id) - stmt = stmt.join(dce_left[0], dce_left[0].id == a.dataset_collection_element_id) - stmt = stmt.join(dce_right[0], dce_right[0].id == v) + for i in range(1, depth + 1): + candidate_hdca_pre_filter_ids_select = candidate_hdca_pre_filter_ids_select.join( + candidate_dce_list[i], + candidate_dce_list[i].dataset_collection_id == candidate_dce_list[i - 1].child_collection_id, + ) + + _leaf_candidate_dce_pre = candidate_dce_list[-1] + candidate_hdca_pre_filter_ids_select = ( + candidate_hdca_pre_filter_ids_select.join(candidate_hda, candidate_hda.id == _leaf_candidate_dce_pre.hda_id) + .where(or_(candidate_hdca_history.user_id == user_id, candidate_hdca_history.published == true())) + .where(candidate_hda.dataset_id.in_(select(reference_all_dataset_ids_cte.c.ref_dataset_id_for_overlap))) + ) + candidate_hdca_pre_filter_ids_cte = candidate_hdca_pre_filter_ids_select.cte( + f"cand_hdca_pre_filter_ids_{k}_{value_index}" + ) + # --- END NEW CTE --- + + # CTE 3: candidate_signature_elements_cte + # This CTE generates the path signature string for each element of the + # *pre-filtered candidate* HDCAs. + candidate_signature_elements_select = ( + select( + candidate_hdca.id.label("candidate_hdca_id"), + func.concat_ws( + ";", + *[candidate_dce_list[i].element_identifier for i in range(depth + 1)], + candidate_hda.dataset_id.cast(sqlalchemy.Text), + ).label("path_signature_string"), + ) + .select_from(candidate_hdca) + # Apply the pre-filter here to limit the candidates for full signature generation + .where(candidate_hdca.id.in_(select(candidate_hdca_pre_filter_ids_cte.c.candidate_hdca_id))) + .join(candidate_hdca_history, candidate_hdca_history.id == candidate_hdca.history_id) + .join(candidate_root_collection, candidate_root_collection.id == candidate_hdca.collection_id) + .join(candidate_dce_list[0], candidate_dce_list[0].dataset_collection_id == candidate_root_collection.id) + .where(or_(candidate_hdca_history.user_id == user_id, candidate_hdca_history.published == true())) + ) - # Parallel walk the collection structure for i in range(1, depth + 1): - stmt = ( - stmt.join(dce_left[i], dce_left[i].dataset_collection_id == dce_left[i - 1].child_collection_id) - .join(dce_right[i], dce_right[i].dataset_collection_id == dce_right[i - 1].child_collection_id) - .filter(dce_left[i].element_identifier == dce_right[i].element_identifier) + candidate_signature_elements_select = candidate_signature_elements_select.join( + candidate_dce_list[i], + candidate_dce_list[i].dataset_collection_id == candidate_dce_list[i - 1].child_collection_id, + ) + + _leaf_candidate_dce = candidate_dce_list[-1] + candidate_signature_elements_select = candidate_signature_elements_select.join( + candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id + ) + candidate_signature_elements_cte = candidate_signature_elements_select.cte( + f"candidate_signature_elements_{k}_{value_index}" + ) + + # CTE 4: candidate_full_signatures_cte + # This CTE aggregates the path signature strings for the candidate HDCAs into + # ordered arrays, similar to the reference's full signature. + candidate_full_signatures_cte = ( + select( + candidate_signature_elements_cte.c.candidate_hdca_id, + self.agg_expression(candidate_signature_elements_cte.c.path_signature_string).label( + "full_signature_array" + ), ) + .select_from(candidate_signature_elements_cte) + .group_by(candidate_signature_elements_cte.c.candidate_hdca_id) + .cte(f"candidate_full_signatures_{k}_{value_index}") + ) - # Compare dataset_ids at the leaf level - leaf_left = dce_left[-1] - leaf_right = dce_right[-1] - stmt = stmt.outerjoin(hda_left, hda_left.id == leaf_left.hda_id) - stmt = stmt.outerjoin(hda_right, hda_right.id == leaf_right.hda_id) + # CTE 5: equivalent_hdca_ids_cte + # This final CTE identifies the HDCAs that are truly "equivalent" by + # comparing their full signature array to the reference HDCA's full signature array. + equivalent_hdca_ids_cte = ( + select(candidate_full_signatures_cte.c.candidate_hdca_id.label("equivalent_id")) + .where( + candidate_full_signatures_cte.c.full_signature_array + == select(reference_full_signature_cte.c.signature_array).scalar_subquery() + ) + .cte(f"equivalent_hdca_ids_{k}_{value_index}") + ) - data_conditions.append(and_(a.name.in_(k), hda_left.dataset_id == hda_right.dataset_id)) + # Main query `stmt` construction + # This section joins the base job statement with the associations and then filters + # by the HDCAs identified as equivalent in the CTEs. + labeled_col = a.dataset_collection_id.label(f"{k}_{value_index}") + stmt = stmt.add_columns(labeled_col) + stmt = stmt.join(a, a.job_id == model.Job.id) - used_ids.append(a.dataset_collection_element_id) + stmt = stmt.join( + hdca_input, + and_( + hdca_input.id == a.dataset_collection_id, + # Filter the main query to only include HDCAs found in the + # 'equivalent_hdca_ids_cte'. + hdca_input.id.in_(select(equivalent_hdca_ids_cte.c.equivalent_id)), + ), + ) + + used_ids.append(labeled_col) + data_conditions.append(a.name == k) return stmt + def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id, value_index): + dce_root_target = self.sa_session.get_one(model.DatasetCollectionElement, v) + + # Determine if the target DCE points to an HDA or a child collection + if dce_root_target.child_collection_id: + # This DCE represents a collection, apply the signature comparison approach + target_collection_id = dce_root_target.child_collection_id + collection_type = self.sa_session.scalar( + select(model.DatasetCollection.collection_type).where( + model.DatasetCollection.id == target_collection_id + ) + ) + depth = collection_type.count(":") if collection_type else 0 + + # Aliases for the target DCE's collection structure + _dce_target_root_ref = aliased( + model.DatasetCollectionElement, name=f"_dce_target_root_ref_{k}_{value_index}" + ) + _dce_target_child_collection_ref = aliased( + model.DatasetCollection, name=f"_dce_target_child_collection_ref_{k}_{value_index}" + ) + # List of aliases for each potential nested level of DatasetCollectionElements + _dce_target_level_list = [ + aliased(model.DatasetCollectionElement, name=f"_dce_target_level_{k}_{value_index}_{i}") + for i in range(depth + 1) + ] + _hda_target_ref = aliased(model.HistoryDatasetAssociation, name=f"_hda_target_ref_{k}_{value_index}") + + # --- CTE: reference_dce_all_dataset_ids_cte --- + # This CTE (Common Table Expression) identifies all distinct dataset IDs + # that are part of the *reference* dataset collection (the one we're searching for). + # This helps in the initial filtering of candidate collections. + reference_all_dataset_ids_select = ( + select(_hda_target_ref.dataset_id.label("ref_dataset_id_for_overlap")) + .select_from(_dce_target_root_ref) + .join( + _dce_target_child_collection_ref, + _dce_target_child_collection_ref.id == _dce_target_root_ref.child_collection_id, + ) + .join( + _dce_target_level_list[0], + _dce_target_level_list[0].dataset_collection_id == _dce_target_child_collection_ref.id, + ) + ) + # Dynamically add joins for each nested level of the collection + for i in range(1, depth + 1): + reference_all_dataset_ids_select = reference_all_dataset_ids_select.join( + _dce_target_level_list[i], + _dce_target_level_list[i].dataset_collection_id + == _dce_target_level_list[i - 1].child_collection_id, + ) + _leaf_target_dce_ref = _dce_target_level_list[-1] + reference_all_dataset_ids_select = ( + reference_all_dataset_ids_select.join( + _hda_target_ref, _hda_target_ref.id == _leaf_target_dce_ref.hda_id + ) + .where(_dce_target_root_ref.id == v) + .distinct() + ) + reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{value_index}") + + # --- CTE: reference_dce_signature_elements_cte --- + # This CTE generates a "path signature string" for each individual element + # within the *reference* collection. This signature combines identifiers + # from all levels of the collection and the dataset ID, providing a unique + # identifier for each dataset's position within the collection structure. + path_components = [ + _dce_target_root_ref.element_identifier, + *[_dce_target_level_list[i].element_identifier for i in range(depth + 1)], + _hda_target_ref.dataset_id.cast(sqlalchemy.Text), # Ensure type for concat_ws + ] + + reference_dce_signature_elements_select = ( + select( + func.concat_ws(";", *path_components).label("path_signature_string"), + _hda_target_ref.dataset_id.label("raw_dataset_id_for_ordering"), # Keep original type for ordering + ) + .select_from(_dce_target_root_ref) + .join( + _dce_target_child_collection_ref, + _dce_target_child_collection_ref.id == _dce_target_root_ref.child_collection_id, + ) + .join( + _dce_target_level_list[0], + _dce_target_level_list[0].dataset_collection_id == _dce_target_child_collection_ref.id, + ) + ) + + for i in range(1, depth + 1): + reference_dce_signature_elements_select = reference_dce_signature_elements_select.join( + _dce_target_level_list[i], + _dce_target_level_list[i].dataset_collection_id + == _dce_target_level_list[i - 1].child_collection_id, + ) + + _leaf_target_dce_ref = _dce_target_level_list[-1] + reference_dce_signature_elements_select = reference_dce_signature_elements_select.join( + _hda_target_ref, _hda_target_ref.id == _leaf_target_dce_ref.hda_id + ).where(_dce_target_root_ref.id == v) + reference_dce_signature_elements_cte = reference_dce_signature_elements_select.cte( + f"ref_dce_sig_els_{k}_{value_index}" + ) + + # --- CTE: reference_full_signature_cte --- + # This CTE aggregates the path signatures and dataset IDs of the *reference* + # collection into ordered arrays. These arrays form the "full signature" + # used for direct comparison with candidate collections. + reference_full_signature_cte = ( + select( + self.agg_expression(reference_dce_signature_elements_cte.c.path_signature_string).label( + "signature_array" + ), + self.agg_expression(reference_dce_signature_elements_cte.c.raw_dataset_id_for_ordering).label( + "ordered_dataset_id_array" + ), + func.count(reference_dce_signature_elements_cte.c.path_signature_string).label( + "element_count" + ), # Count elements based on path_signature_string + ) + .select_from(reference_dce_signature_elements_cte) + .cte(f"ref_dce_full_sig_{k}_{value_index}") + ) + + # --- Aliases for Candidate Dataset Collection Structure --- + # These aliases are used to represent potential matching dataset collections + # in the database, which will be compared against the reference. + candidate_dce_root = aliased(model.DatasetCollectionElement, name=f"candidate_dce_root_{k}_{v}") + candidate_dce_child_collection = aliased( + model.DatasetCollection, name=f"candidate_dce_child_collection_{k}_{value_index}" + ) + candidate_dce_level_list = [ + aliased(model.DatasetCollectionElement, name=f"candidate_dce_level_{k}_{value_index}_{i}") + for i in range(depth + 1) + ] + candidate_hda = aliased(model.HistoryDatasetAssociation, name=f"candidate_hda_{k}_{value_index}") + candidate_history = aliased(model.History, name=f"candidate_history_{k}_{value_index}") + + # --- CTE: candidate_dce_pre_filter_ids_cte (Initial Candidate Filtering) --- + # This CTE performs a first pass to quickly narrow down potential candidate + # dataset collections. It checks for: + # 1. Existence of a child collection (ensuring it's a collection, not a single HDA). + # 2. User permissions (published or owned by the current user). + # 3. Overlap in *any* dataset IDs with the reference collection. + candidate_dce_pre_filter_ids_select = ( + select(candidate_dce_root.id.label("candidate_dce_id")) + .distinct() + .select_from(candidate_dce_root) + .where(candidate_dce_root.child_collection_id.isnot(None)) + .join( + candidate_dce_child_collection, + candidate_dce_child_collection.id == candidate_dce_root.child_collection_id, + ) + .join( + candidate_dce_level_list[0], + candidate_dce_level_list[0].dataset_collection_id == candidate_dce_child_collection.id, + ) + ) + for i in range(1, depth + 1): + candidate_dce_pre_filter_ids_select = candidate_dce_pre_filter_ids_select.join( + candidate_dce_level_list[i], + candidate_dce_level_list[i].dataset_collection_id + == candidate_dce_level_list[i - 1].child_collection_id, + ) + _leaf_candidate_dce_pre = candidate_dce_level_list[-1] + candidate_dce_pre_filter_ids_select = ( + candidate_dce_pre_filter_ids_select.join( + candidate_hda, candidate_hda.id == _leaf_candidate_dce_pre.hda_id + ) + .join(candidate_history, candidate_history.id == candidate_hda.history_id) + .where(or_(candidate_history.published == true(), candidate_history.user_id == user_id)) + .where(candidate_hda.dataset_id.in_(select(reference_all_dataset_ids_cte.c.ref_dataset_id_for_overlap))) + ) + candidate_dce_pre_filter_ids_cte = candidate_dce_pre_filter_ids_select.cte( + f"cand_dce_pre_filter_ids_{k}_{value_index}" + ) + + # --- CTE: candidate_dce_signature_elements_cte --- + # This CTE calculates the path signature string and raw dataset ID for each + # element within the *pre-filtered candidate* collections. This is similar + # to the reference signature elements CTE but for the candidates. + candidate_path_components_fixed = [ + candidate_dce_root.element_identifier, + *[candidate_dce_level_list[i].element_identifier for i in range(depth + 1)], + candidate_hda.dataset_id.cast(sqlalchemy.Text), # Ensure type for concat_ws + ] + + candidate_dce_signature_elements_select = ( + select( + candidate_dce_root.id.label("candidate_dce_id"), + func.concat_ws(";", *candidate_path_components_fixed).label("path_signature_string"), + candidate_hda.dataset_id.label("dataset_id_for_ordered_array"), # This is now Integer + ) + .select_from(candidate_dce_root) + # Apply the initial filter here! + .where(candidate_dce_root.id.in_(select(candidate_dce_pre_filter_ids_cte.c.candidate_dce_id))) + .where(candidate_dce_root.child_collection_id.isnot(None)) + .join( + candidate_dce_child_collection, + candidate_dce_child_collection.id == candidate_dce_root.child_collection_id, + ) + .join( + candidate_dce_level_list[0], + candidate_dce_level_list[0].dataset_collection_id == candidate_dce_child_collection.id, + ) + ) + # Add dynamic joins for nested levels + for i in range(1, depth + 1): + candidate_dce_signature_elements_select = candidate_dce_signature_elements_select.join( + candidate_dce_level_list[i], + candidate_dce_level_list[i].dataset_collection_id + == candidate_dce_level_list[i - 1].child_collection_id, + ) + + _leaf_candidate_dce = candidate_dce_level_list[-1] + candidate_dce_signature_elements_select = ( + candidate_dce_signature_elements_select.join( + candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id + ) + .join(candidate_history, candidate_history.id == candidate_hda.history_id) + .where(or_(candidate_history.published == true(), candidate_history.user_id == user_id)) + ) + candidate_dce_signature_elements_cte = candidate_dce_signature_elements_select.cte( + f"cand_dce_sig_els_{k}_{value_index}" + ) + + # --- CTE: candidate_pre_signatures_cte (Candidate Aggregation for Comparison) --- + # This CTE aggregates the dataset IDs from the candidate collections into + # ordered arrays, similar to `reference_full_signature_cte`. It also + # counts the elements to ensure size consistency. + candidate_pre_signatures_cte = ( + select( + candidate_dce_signature_elements_cte.c.candidate_dce_id, + # Corrected array_agg syntax: pass column directly, use order_by keyword + self.agg_expression(candidate_dce_signature_elements_cte.c.dataset_id_for_ordered_array).label( + "candidate_ordered_dataset_ids_array" + ), + func.count(candidate_dce_signature_elements_cte.c.candidate_dce_id).label( + "candidate_element_count" + ), + ) + .select_from(candidate_dce_signature_elements_cte) + .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id) + .cte(f"cand_dce_pre_sig_{k}_{value_index}") + ) + + # --- CTE: filtered_cand_dce_by_dataset_ids_cte (Filtering by Element Count and Dataset ID Array) --- + # This crucial CTE filters the candidate collections further by comparing: + # 1. Their total element count with the reference collection's element count. + # 2. Their ordered array of dataset IDs with the reference's ordered array. + # This step ensures that candidate collections have the same number of elements + # and contain the exact same datasets, in the same logical order (based on path). + filtered_cand_dce_by_dataset_ids_cte = ( + select(candidate_pre_signatures_cte.c.candidate_dce_id) + .select_from(candidate_pre_signatures_cte, reference_full_signature_cte) + .where( + and_( + candidate_pre_signatures_cte.c.candidate_element_count + == reference_full_signature_cte.c.element_count, + candidate_pre_signatures_cte.c.candidate_ordered_dataset_ids_array + == reference_full_signature_cte.c.ordered_dataset_id_array, + ) + ) + .cte(f"filtered_cand_dce_{k}_{value_index}") + ) + + # --- CTE: final_candidate_signatures_cte (Final Full Signature Calculation for Matched Candidates) --- + # For the candidates that passed the previous filtering, this CTE calculates + # their full path signature array. This signature represents the complete + # structural and content identity of the collection. + final_candidate_signatures_cte = ( + select( + candidate_dce_signature_elements_cte.c.candidate_dce_id, + self.agg_expression(candidate_dce_signature_elements_cte.c.path_signature_string).label( + "full_signature_array" + ), + ) + .select_from(candidate_dce_signature_elements_cte) + .where( + candidate_dce_signature_elements_cte.c.candidate_dce_id.in_( + select(filtered_cand_dce_by_dataset_ids_cte.c.candidate_dce_id) + ) + ) + .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id) + .cte(f"final_cand_dce_full_sig_{k}_{value_index}") + ) + + # --- Main Query Construction for Dataset Collection Elements --- + # This section joins the main `stmt` (representing jobs) with the CTEs + # to filter jobs whose input DCE matches the reference DCE's full signature. + a = aliased( + model.JobToInputDatasetCollectionElementAssociation, + name=f"job_to_input_dce_association_{k}_{value_index}", + ) + labeled_col = a.dataset_collection_element_id.label(f"{k}_{value_index}") + stmt = stmt.add_columns(labeled_col) + stmt = stmt.join(a, a.job_id == model.Job.id) + + input_dce = aliased(model.DatasetCollectionElement) + + stmt = stmt.join( + input_dce, + and_( + input_dce.id == a.dataset_collection_element_id, + # The final filter: ensure the input DCE's ID is among those candidates + # whose full signature array *exactly matches* the reference's signature array. + input_dce.id.in_( + select(final_candidate_signatures_cte.c.candidate_dce_id).where( + final_candidate_signatures_cte.c.full_signature_array + == select(reference_full_signature_cte.c.signature_array).scalar_subquery() + ) + ), + ), + ) + + data_conditions.append(a.name == k) + used_ids.append(labeled_col) + return stmt + + else: # DCE points directly to an HDA (dce_root_target.hda_id is not None and child_collection_id is None) + # For this simple case, the full signature array comparison for nested collections doesn't apply. + # We can use a direct comparison of the HDA and element_identifier. + # This logic needs to align with how this type of DCE was previously matched. + + # Aliases for the "left" side (job to input DCE path) + a = aliased( + model.JobToInputDatasetCollectionElementAssociation, + name=f"job_to_input_dce_association_{k}_{value_index}", + ) + dce_left = aliased(model.DatasetCollectionElement, name=f"dce_left_{k}_{value_index}") + hda_left = aliased(model.HistoryDatasetAssociation, name=f"hda_left_{k}_{value_index}") + + # Aliases for the "right" side (target DCE path in the main query) + dce_right = aliased(model.DatasetCollectionElement, name=f"dce_right_{k}_{value_index}") + hda_right = aliased(model.HistoryDatasetAssociation, name=f"hda_right_{k}_{value_index}") + + # Start joins from job → input DCE association → first-level DCE (left side) + labeled_col = a.dataset_collection_element_id.label(f"{k}_{value_index}") + stmt = stmt.add_columns(labeled_col) + stmt = stmt.join(a, a.job_id == model.Job.id) + stmt = stmt.join(dce_left, dce_left.id == a.dataset_collection_element_id) + stmt = stmt.join(hda_left, hda_left.id == dce_left.hda_id) # Join to HDA for left side + + # Join to target DCE (v) directly (right side) + stmt = stmt.join(dce_right, dce_right.id == v) + stmt = stmt.join(hda_right, hda_right.id == dce_right.hda_id) # Join to HDA for right side + + # Compare element identifiers and dataset IDs + data_conditions.append( + and_( + a.name == k, + dce_left.element_identifier == dce_right.element_identifier, + hda_left.dataset_id == hda_right.dataset_id, # Direct dataset_id comparison + ) + ) + used_ids.append(labeled_col) + return stmt + def view_show_job(trans, job: Job, full: bool) -> Dict: is_admin = trans.user_is_admin diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index c063d8ec3677..f8506ad63b77 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1614,9 +1614,6 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable): back_populates="job", uselist=False ) - any_output_dataset_collection_instances_deleted = None - any_output_dataset_deleted = None - dict_collection_visible_keys = ["id", "state", "exit_code", "update_time", "create_time", "galaxy_version"] dict_element_visible_keys = [ "id", @@ -12082,30 +12079,6 @@ def __repr__(self): # ---------------------------------------------------------------------------------------- # The following statements must not precede the mapped models defined above. -Job.any_output_dataset_collection_instances_deleted = deferred( - column_property( # type:ignore[assignment] - exists(HistoryDatasetCollectionAssociation.id).where( - and_( - Job.id == JobToOutputDatasetCollectionAssociation.job_id, - HistoryDatasetCollectionAssociation.id == JobToOutputDatasetCollectionAssociation.dataset_collection_id, - HistoryDatasetCollectionAssociation.deleted == true(), - ) - ), - ) -) - -Job.any_output_dataset_deleted = deferred( - column_property( # type:ignore[assignment] - exists(HistoryDatasetAssociation.id).where( - and_( - Job.id == JobToOutputDatasetAssociation.job_id, - HistoryDatasetAssociation.table.c.id == JobToOutputDatasetAssociation.dataset_id, - HistoryDatasetAssociation.table.c.deleted == true(), - ) - ), - ) -) - History.average_rating = column_property( # type:ignore[assignment] select(func.avg(HistoryRatingAssociation.rating)) .where(HistoryRatingAssociation.history_id == History.id) diff --git a/lib/galaxy_test/api/test_tools.py b/lib/galaxy_test/api/test_tools.py index 6e47c5e14271..60cd250a56a3 100644 --- a/lib/galaxy_test/api/test_tools.py +++ b/lib/galaxy_test/api/test_tools.py @@ -1056,6 +1056,41 @@ def test_run_cat1_use_cached_job(self): assert len(filenames) == 3, filenames assert len(set(filenames)) <= 2, filenames + @skip_without_tool("cat_list") + @skip_without_tool("__SORTLIST__") + def test_run_cat_list_hdca_sort_order_respecrted_use_cached_job(self): + with self.dataset_populator.test_history_for( + self.test_run_cat_list_hdca_sort_order_respecrted_use_cached_job + ) as history_id: + fetch_response = self.dataset_collection_populator.create_list_in_history( + history_id, wait=True, contents=[("C", "3"), ("B", "2"), ("A", "1")] + ).json() + hdca_not_sorted_id = fetch_response["output_collections"][0]["id"] + result = self._run( + tool_id="__SORTLIST__", + history_id=history_id, + inputs={"input": {"src": "hdca", "id": hdca_not_sorted_id}}, + assert_ok=True, + ) + hdca_sorted_id = result["output_collections"][0]["id"] + self.dataset_populator.get_history_collection_details(history_id, content_id=hdca_sorted_id) + hdca_sorted = self.dataset_populator.get_history_collection_details(history_id, content_id=hdca_sorted_id) + hdca_not_sorted = self.dataset_populator.get_history_collection_details( + history_id, content_id=hdca_not_sorted_id + ) + assert hdca_sorted["elements"][0]["object"]["name"] == "A" + assert hdca_not_sorted["elements"][0]["object"]["name"] == "C" + self._run("cat_list", history_id, inputs={"input1": {"src": "hdca", "id": hdca_sorted_id}}, assert_ok=True) + job = self._run( + "cat_list", + history_id, + inputs={"input1": {"src": "hdca", "id": hdca_not_sorted_id}}, + assert_ok=True, + use_cached_job=True, + ) + job_details = self.dataset_populator.get_job_details(job["jobs"][0]["id"], full=True).json() + assert not job_details["copied_from_job_id"] + @skip_without_tool("cat1") @requires_new_history def test_run_cat1_use_cached_job_from_public_history(self): @@ -1097,6 +1132,23 @@ def test_run_cat1_use_cached_job_renamed_input(self): job_details = self.dataset_populator.get_job_details(copied_job_id, full=True).json() assert job_details["copied_from_job_id"] == outputs_one["jobs"][0]["id"] + @skip_without_tool("cat_list") + @requires_new_history + def test_run_cat_list_use_cached_job_repeated_input(self): + with self.dataset_populator.test_history_for( + self.test_run_cat_list_use_cached_job_repeated_input + ) as history_id: + # Run simple non-upload tool with an input data parameter. + input_value = dataset_to_param(self.dataset_populator.new_dataset(history_id=history_id)) + inputs = {"input1": {"batch": False, "values": [input_value, input_value]}} + outputs_one = self._run("cat_list", history_id, inputs, assert_ok=True, wait_for_job=True) + outputs_two = self._run( + "cat_list", history_id, inputs, assert_ok=True, wait_for_job=True, use_cached_job=True + ) + copied_job_id = outputs_two["jobs"][0]["id"] + job_details = self.dataset_populator.get_job_details(copied_job_id, full=True).json() + assert job_details["copied_from_job_id"] == outputs_one["jobs"][0]["id"] + @skip_without_tool("collection_creates_list") @requires_new_history def test_run_collection_creates_list_use_cached_job(self):