diff --git a/src/datasets/loaders/scrnaseq/op3_loader/config.vsh.yaml b/src/datasets/loaders/scrnaseq/op3_loader/config.vsh.yaml new file mode 100644 index 000000000..26cf1f3c9 --- /dev/null +++ b/src/datasets/loaders/scrnaseq/op3_loader/config.vsh.yaml @@ -0,0 +1,107 @@ +name: op3_loader +namespace: datasets/loaders/scrnaseq +description: | + "Loads and preprocesses the OP3 dataset from GEO accession GSE279945." + +argument_groups: + - name: Input + arguments: + - name: "--input" + type: string + description: "Input url to the .h5ad file." + direction: input + required: false + default: https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad + example: https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad + - name: "--var_feature_name" + type: string + description: "Location of where to find the feature names. Can be set to index if the feature names are the index." + default: index + - name: Data Filtering + description: "Arguments for filtering the dataset" + arguments: + - name: "--donor_id" + type: string + description: "Donor ID to filter for (1, 2, or 3). If not specified, all donors are included." + required: false + - name: "--cell_type" + type: string + description: "Cell type to filter for (T cells, B cells, NK cells, or Myeloid). If not specified, all cell types are included." + required: false + - name: "--perturbation" + type: string + description: "Perturbation to filter for. If not specified, all perturbations are included." + required: false + + - name: Dataset Metadata + description: "Metadata about the dataset" + arguments: + - name: "--dataset_id" + type: string + description: "Unique identifier for the dataset" + default: "op3" + - name: "--dataset_name" + type: string + description: "Human-readable name for the dataset" + default: "OP3: single-cell multimodal dataset in PBMCs for perturbation prediction benchmarking" + - name: "--dataset_summary" + type: string + description: "Short summary of the dataset" + default: "The Open Problems Perurbation Prediction (OP3) dataset with small molecule perturbations in PBMCs" + - name: "--dataset_description" + type: string + description: "Detailed description of the dataset" + default: "The OP3 dataset is to-date the largest single-cell small molecule perturbation dataset in primary tissue with multiple donor replicates." + - name: "dataset_reference" + type: string + description: "Bibtex reference of the paper in which the dataset was published." + required: false + default: GSE279945 + - name: "--dataset_url" + type: string + description: "Link to the original source of the dataset." + required: false + default: https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad + + - name: Output + description: "Output parameters" + arguments: + - name: "--output" + type: file + description: "Output h5ad file." + direction: output + required: true + - name: "--output_compression" + type: string + choices: [gzip, lzf] + required: false + default: "gzip" + +resources: + - type: python_script + path: script.py + +engines: + - type: docker + image: python:3.11 + setup: + - type: python + packages: + - scanpy + - anndata + - pandas + - numpy + - requests + test_setup: + - type: python + packages: + - viashpy + +runners: + - type: executable + - type: nextflow + +test_resources: + - type: python_script + path: test.py + diff --git a/src/datasets/loaders/scrnaseq/op3_loader/script.py b/src/datasets/loaders/scrnaseq/op3_loader/script.py new file mode 100644 index 000000000..47ec65b9f --- /dev/null +++ b/src/datasets/loaders/scrnaseq/op3_loader/script.py @@ -0,0 +1,298 @@ +import os +import scanpy as sc +import anndata as ad +import pandas as pd +import numpy as np +import requests +import logging + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +## VIASH START +par = { + "input": "https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad", + "var_feature_name": "index", + "data_type": "sc", + "donor_id": None, + "cell_type": None, + "perturbation": None, + "dataset_id": "op3", + "dataset_name": "OP3: single-cell multimodal dataset in PBMCs for perturbation prediction benchmarking", + "dataset_summary": "The Open Problems Perurbation Prediction (OP3) dataset with small molecule perturbations in PBMCs", + "dataset_description": "The OP3 dataset is to-date the largest single-cell small molecule perturbation dataset in primary tissue with multiple donor replicates.", + "output": "output.h5ad", + "output_compression": "gzip" +} +meta = {} +## VIASH END + +def download_file(url, destination, max_retries=5): + """Download a file from a URL to a destination.""" + # Create directory if it doesn't exist + os.makedirs(os.path.dirname(destination), exist_ok=True) + + # Get file size from server + response = requests.head(url) + total_size_server = int(response.headers.get('content-length', 0)) + + # Check if file exists and is complete + if os.path.exists(destination): + file_size = os.path.getsize(destination) + if file_size == total_size_server: + logger.info(f"File already exists and is complete at {destination}, skipping download.") + return + elif file_size < total_size_server: + logger.info(f"Resuming download from byte {file_size} of {total_size_server}") + else: + logger.warning(f"Existing file is larger than expected. Restarting download.") + file_size = 0 + else: + file_size = 0 + + # Try to download with retries + for attempt in range(max_retries): + try: + # Set up headers for resume + headers = {} + mode = 'wb' + if file_size > 0: + headers['Range'] = f'bytes={file_size}-' + mode = 'ab' # Append to existing file + + logger.info(f"Downloading {url} to {destination}") + + # Make request with resume header if applicable + response = requests.get(url, headers=headers, stream=True, timeout=60) + + # Handle resume response or normal response + if response.status_code == 206: # Partial content + content_length = int(response.headers.get('content-length', 0)) + total_size = content_length + file_size + elif response.status_code == 200: # OK + total_size = int(response.headers.get('content-length', 0)) + file_size = 0 # Reset file size for progress tracking + else: + logger.error(f"Unexpected status code: {response.status_code}") + continue + + with open(destination, mode) as f: + for chunk in response.iter_content(chunk_size=1024*1024): # 1MB chunks + if chunk: + f.write(chunk) + + # Normal verification code + if os.path.getsize(destination) >= total_size: + logger.info(f"Download completed: {destination}") + return + else: + logger.warning(f"Downloaded file size mismatch. Retrying...") + file_size = os.path.getsize(destination) + + except (requests.exceptions.RequestException, IOError) as e: + logger.warning(f"Download error (attempt {attempt+1}/{max_retries}): {str(e)}") + # Update file size for next attempt + if os.path.exists(destination): + file_size = os.path.getsize(destination) + + # Wait before retrying + if attempt < max_retries - 1: + wait_time = 2 ** attempt # Exponential backoff + logger.info(f"Waiting {wait_time} seconds before retrying...") + import time + time.sleep(wait_time) + + raise Exception(f"Failed to download {url} after {max_retries} attempts") + +def download_op3_data(url): + """Download the OP3 dataset from GEO.""" + filename = "GSE279945_sc_counts_processed.h5ad" + + cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "op3_loader") + os.makedirs(cache_dir, exist_ok=True) + + destination = os.path.join(cache_dir, filename) + download_file(url, destination) + + return destination + +def filter_op3_data(adata): + """ + Filter the OP3 dataset based on specific criteria for each small molecule and cell type. + """ + logger.info("Applying OP3-specific filtering criteria") + + # Original filter code follows... + + # Create a boolean mask for filtering observations + obs_filt = np.ones(adata.n_obs, dtype=bool) + + # Alvocidib only T cells in only 2 donors, remove + obs_filt = obs_filt & (adata.obs['sm_name'] != "Alvocidib") + + # BMS-387032 - one donor with only T cells, two other consistent, but only 2 cell types + # Leave the 2 cell types in, remove donor 2 with only T cells + obs_filt = obs_filt & ~((adata.obs['sm_name'] == "BMS-387032") & (adata.obs['donor_id'] == "Donor 2")) + + # BMS-387032 remove myeloid cells and B cells + obs_filt = obs_filt & ~((adata.obs['sm_name'] == "BMS-387032") & + adata.obs['cell_type'].isin(["B cells", "Myeloid cells"])) + + # CGP 60474 has only T cells left, remove + obs_filt = obs_filt & (adata.obs['sm_name'] != "CGP 60474") + + # Canertinib - the variation of Myeloid cell proportions is very large, skip Myeloid + obs_filt = obs_filt & ~((adata.obs['sm_name'] == "Canertinib") & + (adata.obs['cell_type'] == "Myeloid cells")) + + # Foretinib - large variation in Myeloid cell proportions (some in T cells), skip Myeloid + obs_filt = obs_filt & ~((adata.obs['sm_name'] == "Foretinib") & + (adata.obs['cell_type'] == "Myeloid cells")) + + # Ganetespib (STA-9090) - donor 2 has no Myeloid and small NK cells proportions + # Skip Myeloid, remove donor 2 + obs_filt = obs_filt & ~((adata.obs['sm_name'] == "Ganetespib (STA-9090)") & + (adata.obs['donor_id'] == "Donor 2")) + + # IN1451 - donor 2 has no NK or B, remove Donor 2 + obs_filt = obs_filt & ~((adata.obs['sm_name'] == "IN1451") & + (adata.obs['donor_id'] == "Donor 2")) + + # Navitoclax - donor 3 doesn't have B cells and has different T and Myeloid proportions + # Remove donor 3 + obs_filt = obs_filt & ~((adata.obs['sm_name'] == "Navitoclax") & + (adata.obs['donor_id'] == "Donor 3")) + + # PF-04691502 remove Myeloid (only present in donor 3) + obs_filt = obs_filt & ~((adata.obs['sm_name'] == "PF-04691502") & + (adata.obs['cell_type'] == "Myeloid cells")) + + # Proscillaridin A;Proscillaridin-A remove Myeloid, since the variation is very high (4x) + obs_filt = obs_filt & ~((adata.obs['sm_name'] == "Proscillaridin A;Proscillaridin-A") & + (adata.obs['cell_type'] == "Myeloid cells")) + + # R428 - skip NK due to high variation (close to 3x) + obs_filt = obs_filt & ~((adata.obs['sm_name'] == "R428") & + (adata.obs['cell_type'] == "NK cells")) + + # UNII-BXU45ZH6LI - remove due to large variation across all cell types and missing cell types + obs_filt = obs_filt & (adata.obs['sm_name'] != "UNII-BXU45ZH6LI") + + # Apply the filter + filtered_adata = adata[obs_filt].copy() + + logger.info(f"Filtered data from {adata.n_obs} to {filtered_adata.n_obs} cells") + + return filtered_adata + +def fix_splits(adata): + """Fix splits after reannotation.""" + logger.info("Fix splits after reannotation") + adata.obs["cell_type_orig_updated"] = adata.obs["cell_type_orig"].apply(lambda x: "T cells" if x.startswith("T ") else x) + adata.obs["sm_cell_type_orig"] = adata.obs["sm_name"].astype(str) + "_" + adata.obs["cell_type_orig_updated"].astype(str) + mapping_to_split = adata.obs.groupby("sm_cell_type_orig")["split"].apply(lambda x: x.unique()[0]).to_dict() + adata.obs["sm_cell_type"] = adata.obs["sm_name"].astype(str) + "_" + adata.obs["cell_type"].astype(str) + adata.obs["split"] = adata.obs["sm_cell_type"].map(mapping_to_split) + adata.obs['control'] = adata.obs['split'].eq("control") + return adata + + +def move_x_to_layers(adata): + """Move .X to .layers['counts'] and set X to None.""" + logger.info("Moving .X to .layers['counts']") + adata.layers["counts"] = adata.X.copy() + adata.X = None + +def add_metadata_to_uns(adata, par): + """Add standardized metadata to .uns.""" + logger.info("Adding metadata to .uns") + adata.uns['dataset_id'] = par["dataset_id"] + adata.uns['dataset_name'] = par["dataset_name"] + adata.uns['dataset_summary'] = par["dataset_summary"] + adata.uns['dataset_description'] = par["dataset_description"] + adata.uns['dataset_organism'] = 'Homo sapiens' + adata.uns['dataset_url'] = par["dataset_url"] + adata.uns['dataset_reference'] = 'GSE279945' + adata.uns['dataset_version'] = '1.0.0' + adata.uns['processing_status'] = 'processed' + +def print_unique(adata, column): + """Print unique values in a column.""" + if column in adata.obs: + values = adata.obs[column].unique() + if len(values) <= 10: + formatted = "', '".join(values) + logger.info(f"Unique {column}: ['{formatted}']") + else: + logger.info(f"Unique {column}: {len(values)} values") + +def print_summary(adata): + """Print a summary of the dataset.""" + logger.info(f"Dataset shape: {adata.shape}") + logger.info(f"Number of cells: {adata.n_obs}") + logger.info(f"Number of genes: {adata.n_vars}") + + # Print unique values for key columns + for column in ['donor_id', 'cell_type', 'perturbation']: + print_unique(adata, column) + + logger.info(f"Layers: {list(adata.layers.keys())}") + logger.info(f"Metadata: {list(adata.uns.keys())}") + +def write_anndata(adata, par): + """Write AnnData object to file.""" + logger.info(f"Writing AnnData object to '{par['output']}'") + adata.write_h5ad(par["output"], compression=par["output_compression"]) + +# Instead of defining main() and calling it at the end, write the code directly +logger.info("Starting OP3 loader") + +# Download the data +logger.info(f"Downloading data from {par['input']}") +data_path = download_op3_data(par['input']) + +# Load the data +logger.info(f"Loading data from {data_path}") +adata = sc.read_h5ad(data_path) + +# Apply OP3-specific filtering +adata = filter_op3_data(adata) +# Filter by parameters +if par["donor_id"] is not None: + logger.info(f"Filtering for donor_id: {par['donor_id']}") + adata = adata[adata.obs["donor_id"] == par["donor_id"]] + +if par["cell_type"] is not None: + logger.info(f"Filtering for cell_type: {par['cell_type']}") + adata = adata[adata.obs["cell_type"] == par["cell_type"]] + +if par["perturbation"] is not None: + logger.info(f"Filtering for perturbation: {par['perturbation']}") + adata = adata[adata.obs["perturbation"] == par["perturbation"]] + +# Fix splits after reannotation +fix_splits(adata) + +# Move X to layers and normalize +move_x_to_layers(adata) + +# Add dataset metadata +add_metadata_to_uns(adata, par) + +# Add a feature name +logger.info("Setting .var['feature_name']") + +if par["var_feature_name"] == "index": + adata.var["feature_name"] = adata.var.index +else: + if par["var_feature_name"] in adata.var: + adata.var["feature_name"] = adata.var[par["feature_name"]] + del adata.var[par["feature_name"]] + else: + logger.info(f"Warning: key '{par['var_feature_name']}' could not be found in adata.var.") + +# Print summary and save +print_summary(adata) +write_anndata(adata, par) + +logger.info("Done") diff --git a/src/datasets/loaders/scrnaseq/op3_loader/test.py b/src/datasets/loaders/scrnaseq/op3_loader/test.py new file mode 100755 index 000000000..24ce953e0 --- /dev/null +++ b/src/datasets/loaders/scrnaseq/op3_loader/test.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 + +import os +import pytest +import anndata as ad +import numpy as np + +## VIASH START +meta = { + 'resources_dir': './resources_test/', + 'executable': './target/docker/datasets/loaders/scrnaseq/op3_loader', + 'config': os.path.abspath('./src/datasets/loaders/scrnaseq/op3_loader/config.vsh.yaml') +} +## VIASH END + +def test_op3_loader(run_component, tmp_path): + """Test the OP3 loader.""" + output_file = str(tmp_path / "output.h5ad") # Convert to string to be safe + + run_component([ + "--input", "https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad", + "--var_feature_name", "index", + "--donor_id", "1", + "--cell_type", "T cells", + "--dataset_id", "test_op3", + "--dataset_name", "OP3 Test Dataset", + "--dataset_summary", "Test summary for OP3 dataset", + "--dataset_description", "Test description for OP3 dataset", + "--dataset_url", "https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad", + "--output", output_file, + "--output_compression", "gzip" + ]) + + # check whether file exists + assert os.path.exists(output_file), "Output file does not exist" + + adata = ad.read_h5ad(output_file) + + # check obs + assert not adata.obs.empty, ".obs should not be empty" + assert "donor_id" in adata.obs.columns + assert np.all(adata.obs["donor_id"] == "1"), "Not all cells are from donor 1" + assert "cell_type" in adata.obs.columns + assert np.all(adata.obs["cell_type"] == "T cells"), "Not all cells are T cells" + assert "perturbation" in adata.obs.columns + assert adata.n_obs > 0 + + # check var + assert not adata.var.empty, ".var should not be empty" + assert adata.n_vars > 0 + + # check layers + assert "counts" in adata.layers + assert adata.X is not None, "X matrix should not be None" + + # check uns + assert adata.uns["dataset_id"] == "test_op3", "Incorrect .uns['dataset_id']" + assert adata.uns["dataset_name"] == "OP3 Test Dataset", "Incorrect .uns['dataset_name']" + assert adata.uns["dataset_summary"] == "Test summary for OP3 dataset", "Incorrect .uns['dataset_summary']" + assert adata.uns["dataset_description"] == "Test description for OP3 dataset", "Incorrect .uns['dataset_description']" diff --git a/src/datasets/resource_scripts/op3_loader.sh b/src/datasets/resource_scripts/op3_loader.sh new file mode 100755 index 000000000..b99446e37 --- /dev/null +++ b/src/datasets/resource_scripts/op3_loader.sh @@ -0,0 +1,54 @@ +#!/bin/bash +#It is an example how to set parameters and execute processing for the op3 dataset. +set -e + +params_file="/tmp/datasets_op3.yaml" + +cat > "$params_file" << 'HERE' +param_list: + - id: op3 + input: https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad + dataset_name: "OP3: single-cell multimodal dataset in PBMCs for perturbation prediction benchmarking" + dataset_summary: "The Open Problems Perurbation Prediction (OP3) dataset with small molecule perturbations in PBMCs" + dataset_description: "The OP3 dataset is to-date the largest single-cell small molecule perturbation dataset in primary tissue with multiple donor replicates." + dataset_url: "https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad" + dataset_reference: GSE279945 +normalization_methods: + - log_cp10k +output_dataset: '$id/dataset.h5ad' +do_subsample: False +output_meta: '$id/dataset_metadata.yaml' +output_state: '$id/state.yaml' +output_raw: force_null +output_normalized: force_null +output_pca: force_null +output_hvg: force_null +output_knn: force_null +publish_dir: s3://openproblems-data/resources/datasets/op3_rnaseq +HERE + +cat > "/tmp/nextflow.config" << 'HERE' +process { + withName:'.*publishStatesProc' { + memory = '100GB' + disk = '1000GB' + } +} +HERE + +#tw launch https://github.com/openproblems-bio/openproblems.git \ +# --revision main_build \ +# --pull-latest \ +# --main-script target/nextflow/datasets/workflows/scrnaseq/process_op3/main.nf \ +# --workspace 53907369739130 \ +# --params-file "$params_file" \ +# --labels op3_loader,dataset_loader \ +# --config /tmp/nextflow.config +# +set -x +nextflow run . \ + -main-script target/nextflow/datasets/workflows/scrnaseq/process_op3/main.nf \ + -profile docker \ + -resume \ + -params-file "$params_file" \ + -config /tmp/nextflow.config diff --git a/src/datasets/resource_scripts/op3_loader_test.sh b/src/datasets/resource_scripts/op3_loader_test.sh new file mode 100755 index 000000000..682c4f4d4 --- /dev/null +++ b/src/datasets/resource_scripts/op3_loader_test.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +REPO_ROOT=$(git rev-parse --show-toplevel) +cd "$REPO_ROOT" + +DATASET_DIR=op3_data/ + +set -e +mkdir -p $DATASET_DIR + +cat > "/tmp/nextflow.config" << 'HERE' +process { + withName:'.*publishStatesProc' { + memory = '100GB' + disk = '1000GB' + } +} +HERE + +set -x +nextflow run . \ + -main-script target/nextflow/datasets/workflows/scrnaseq/process_op3/main.nf \ + -profile docker \ + -resume \ + --input https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad \ + --id op3 \ + --dataset_name "OP3: single-cell multimodal dataset in PBMCs for perturbation prediction benchmarking" \ + --dataset_summary "The Open Problems Perurbation Prediction (OP3) dataset with small molecule perturbations in PBMCs" \ + --dataset_description "The OP3 dataset is to-date the largest single-cell small molecule perturbation dataset in primary tissue with multiple donor replicates." \ + --dataset_url "https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad" \ + --dataset_reference GSE279945 \ + --do_subsample False \ + --normalization_methods log_cp10k \ + --output_dataset '$id/dataset.h5ad' \ + --output_meta '$id/dataset_metadata.yaml' \ + --output_state '$id/state.yaml' \ + --output_raw force_null \ + --output_normalized force_null \ + --output_pca force_null \ + --output_hvg force_null \ + --output_knn force_null \ + --publish_dir $DATASET_DIR \ + -config /tmp/nextflow.config + diff --git a/src/datasets/workflows/scrnaseq/process_op3/config.vsh.yaml b/src/datasets/workflows/scrnaseq/process_op3/config.vsh.yaml new file mode 100644 index 000000000..3c741d1ec --- /dev/null +++ b/src/datasets/workflows/scrnaseq/process_op3/config.vsh.yaml @@ -0,0 +1,161 @@ +name: process_op3 +namespace: datasets/workflows/scrnaseq +description: | + "Fetch, filter, normalize, and prepare datasets from the Open Problems Perturbation Prediction (OP3) dataset + for benchmarking and analysis." +argument_groups: + - name: Input options + description: "Options for selecting input data" + arguments: + - name: --input + type: string + description: "Link to initial input dataset" + required: false + default: https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad + example: https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad + - name: --data_type + type: string + description: "Data type to load" + default: sc + choices: [sc] + - name: --donor_id + type: string + description: "Filter to a specific donor" + required: false + - name: --cell_type + type: string + description: "Filter to a specific cell type" + required: false + - name: --perturbation + type: string + description: "Filter to a specific perturbation" + required: false + - name: Dataset metadata + description: "Information about the dataset that will be stored in the `.uns` slot." + arguments: + - name: --id + type: string + description: "Unique identifier." + required: true + default: op3 + - name: --dataset_name + type: string + description: "Nicely formatted name." + required: true + default: "OP3: single-cell multimodal dataset in PBMCs for perturbation prediction benchmarking" + - name: --dataset_url + type: string + description: "Link to the original source of the dataset." + required: false + default: https://ftp.ncbi.nlm.nih.gov/geo/series/GSE279nnn/GSE279945/suppl/GSE279945_sc_counts_processed.h5ad + - name: --dataset_reference + type: string + description: "Bibtex reference of the paper in which the dataset was published." + required: false + default: GSE279945 + - name: --dataset_summary + type: string + description: "Short description of the dataset." + required: true + default: "The Open Problems Perurbation Prediction (OP3) dataset with small molecule perturbations in PBMCs" + - name: --dataset_description + type: string + description: "Long description of the dataset." + required: true + default: "The OP3 dataset is to-date the largest single-cell small molecule perturbation + dataset in primary tissue with multiple donor replicates." + - name: Sampling options + arguments: + - name: --do_subsample + type: boolean + default: false + description: "Whether or not to subsample the dataset." + - name: --n_obs + type: integer + description: "Maximum number of observations to be kept. It might end up being + less because empty cells / genes are removed." + default: 500 + - name: --n_vars + type: integer + description: "Maximum number of variables to be kept. It might end up being + less because empty cells / genes are removed." + default: 500 + - name: --keep_features + type: string + multiple: true + description: "A list of genes to keep." + - name: --keep_cell_type_categories + type: string + multiple: true + description: "Categories indexes to be selected." + required: false + - name: --keep_batch_categories + type: string + multiple: true + description: "Categories indexes to be selected." + required: false + - name: --even + type: boolean_true + description: "Subsample evenly from different batches." + - name: --seed + type: integer + description: "A seed for the subsampling." + example: 123 + - name: Normalization + arguments: + - name: --normalization_methods + type: string + multiple: true + choices: [log_cp10k, log_cpm, sqrt_cp10k, sqrt_cpm, l1_sqrt, log_scran_pooling] + default: [log_cp10k, log_cpm, sqrt_cp10k, sqrt_cpm, l1_sqrt] + description: "Which normalization methods to run." + - name: Outputs + arguments: + - name: --output_dataset + __merge__: /src/datasets/api/file_common_dataset.yaml + direction: output + required: true + - name: --output_meta + type: file + direction: output + required: false + default: dataset_metadata.yaml + description: "Dataset metadata." + - name: --output_raw + __merge__: /src/datasets/api/file_raw.yaml + direction: output + required: false + - name: --output_normalized + __merge__: /src/datasets/api/file_normalized.yaml + direction: output + required: false + - name: --output_pca + __merge__: /src/datasets/api/file_pca.yaml + direction: output + required: false + - name: --output_hvg + __merge__: /src/datasets/api/file_hvg.yaml + direction: output + required: false + - name: --output_knn + __merge__: /src/datasets/api/file_knn.yaml + direction: output + required: false +resources: + - type: nextflow_script + path: main.nf + entrypoint: run_wf + - path: /common/nextflow_helpers/helper.nf +dependencies: + - name: datasets/loaders/scrnaseq/op3_loader + - name: datasets/normalization/log_cp + - name: datasets/normalization/log_scran_pooling + - name: datasets/normalization/sqrt_cp + - name: datasets/normalization/l1_sqrt + - name: datasets/processors/subsample + - name: datasets/processors/pca + - name: datasets/processors/hvg + - name: datasets/processors/knn + - name: utils/extract_uns_metadata +runners: + - type: nextflow diff --git a/src/datasets/workflows/scrnaseq/process_op3/main.nf b/src/datasets/workflows/scrnaseq/process_op3/main.nf new file mode 100644 index 000000000..2d7f83a5a --- /dev/null +++ b/src/datasets/workflows/scrnaseq/process_op3/main.nf @@ -0,0 +1,156 @@ +include { findArgumentSchema } from "${meta.resources_dir}/helper.nf" + +workflow auto { + findStates(params, meta.config) + | meta.workflow.run( + auto: [publish: "state"] + ) +} + +workflow run_wf { + take: + input_ch + + main: + + // create different normalization methods by overriding the defaults + normalization_methods = [ + log_cp.run( + key: "log_cp10k", + args: [normalization_id: "log_cp10k", n_cp: 10000], + ), + log_cp.run( + key: "log_cpm", + args: [normalization_id: "log_cpm", n_cp: 1000000], + ), + sqrt_cp.run( + key: "sqrt_cp10k", + args: [normalization_id: "sqrt_cp10k", n_cp: 10000], + ), + sqrt_cp.run( + key: "sqrt_cpm", + args: [normalization_id: "sqrt_cpm", n_cp: 1000000], + ), + l1_sqrt.run( + key: "l1_sqrt", + args: [normalization_id: "l1_sqrt"], + ), + log_scran_pooling.run( + key: "log_scran_pooling", + args: [normalization_id: "log_scran_pooling"], + ) + ] + + output_ch = input_ch + + // store original id for later use + | map{ id, state -> + [id, state + [_meta: [join_id: id]]] + } + + // fetch data from OP3 dataset + | op3_loader.run( + fromState: [ + "input": "input", + "data_type": "data_type", + "donor_id": "donor_id", + "cell_type": "cell_type", + "perturbation": "perturbation", + "dataset_id": "id", + "dataset_name": "dataset_name", + "dataset_url": "dataset_url", + "dataset_reference": "dataset_reference", + "dataset_summary": "dataset_summary", + "dataset_description": "dataset_description" + ], + toState: ["output_raw": "output"] + ) + + // subsample if so desired + | subsample.run( + runIf: { id, state -> state.do_subsample }, + fromState: [ + "input": "output_raw", + "n_obs": "n_obs", + "n_vars": "n_vars", + "keep_features": "keep_features", + "keep_cell_type_categories": "keep_cell_type_categories", + "keep_batch_categories": "keep_batch_categories", + "even": "even", + "seed": "seed" + ], + args: [output_mod2: null], + toState: ["output_raw": "output"] + ) + + | runEach( + components: normalization_methods, + id: { id, state, comp -> + if (state.normalization_methods.size() > 1) { + id + "/" + comp.name + } else { + id + } + }, + filter: { id, state, comp -> + comp.name in state.normalization_methods + }, + fromState: ["input": "output_raw"], + toState: { id, output, state, comp -> + state + [ + output_normalized: output.output, + normalization_id: comp.name + ] + } + ) + + | hvg.run( + fromState: ["input": "output_normalized"], + toState: ["output_hvg": "output"] + ) + + | pca.run( + fromState: ["input": "output_hvg"], + toState: ["output_pca": "output" ] + ) + + | knn.run( + fromState: ["input": "output_pca"], + toState: ["output_knn": "output"] + ) + + // add synonym + | map{ id, state -> + [id, state + [output_dataset: state.output_knn]] + } + + | extract_uns_metadata.run( + fromState: { id, state -> + def schema = findArgumentSchema(meta.config, "output_dataset") + // workaround: convert GString to String + schema = iterateMap(schema, { it instanceof GString ? it.toString() : it }) + def schemaYaml = tempFile("schema.yaml") + writeYaml(schema, schemaYaml) + [ + "input": state.output_dataset, + "schema": schemaYaml + ] + }, + toState: ["output_meta": "output"] + ) + + // only output the files for which an output file was specified + | setState([ + "output_dataset", + "output_meta", + "output_raw", + "output_normalized", + "output_pca", + "output_hvg", + "output_knn", + "_meta" + ]) + + emit: + output_ch +}