Skip to content

LREN-CHUV/data-factory-airflow-dags

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CHUV License Codacy Badge Code Health CircleCI

Airflow MRI preprocessing DAGs

Requirements:

  • airflow-imaging-plugins
  • mri-preprocessing-pipeline
  • data-tracking

To see this project in action, go to the demo of MIP Data Factory and follow the instructions.

Setup and configuration

Airflow setup for MRI scans pipeline:

  • Create the following pools:

    • image_preprocessing with N slots, where N is less than the number of vCPUs available on the machine
    • remote_file_copy with N slots, where N should be 1 or 2 to avoid saturating network IO
  • In Airflow config file, add a [spm] section with the following entries:

    • SPM_DIR: path to the root folder of SPM 12.
  • In Airflow config file, add a [mipmap] section with the following entries (required only if MipMap is used for ETL):

    • DB_CONFIG_FILE: path to the configuration file used by MipMap to connect to its work database.
  • In Airflow config file, add a [data-factory] section with the following entries:

    • DATASETS: comma separated list of datasets to process. Each dataset is configured using a [<dataset>] section in the config file
    • EMAIL_ERRORS_TO: email address to send errors to
    • SLACK_CHANNEL: optional, Slack channel to use to send status messages
    • SLACK_CHANNEL_USER: optional, user to post as in Slack
    • SLACK_TOKEN: optional, authorisation token for Slack
    • DATA_CATALOG_SQL_ALCHEMY_CONN: connection URL to the Data catalog database tracking artifacts generated by the MRI pipelines.
    • I2B2_SQL_ALCHEMY_CONN: connection URL to the I2B2 database storing all the MRI pipelines results.
  • For each dataset, add a [data-factory:<dataset>] section, replacing <dataset> with the name of the dataset and define the following entries:

    • DATASET_LABEL: Name of the dataset
  • For each dataset, configure the [data-factory:<dataset>:reorganisation] section if you need to reorganise an input folder containing only files or folders that will be split into several folders, one per visit for example:

    • INPUT_FOLDER: Folder containing the original imaging data to process. This data should have been already anonymised by a tool
    • INPUT_FOLDER_DEPTH: depth of folders to explore while scanning the original imaging data to process.
    • INPUT_CONFIG: List of flags defining how incoming imaging data are organised, values are defined below in the preprocessing section.
    • MAX_ACTIVE_RUNS: maximum number of reorganisation tasks in parallel
    • FOLDER_FILTER: regex that describes acceptable folder names. Folders that does not fully match it will be discarded.
    • PIPELINES: List of pipelines to execute. Values are
      • copy_to_local: if used, input data are first copied to a local folder to speed-up processing.
      • dicom_reorganise:
        • output_folder: output folder that will contain the reorganised data.
        • output_folder_structure: description of the desired folder organisation. E.g. '#PatientID/#StudyID/#SeriesDescription/#SeriesNumber'
        • docker_image: organiser docker image.
        • docker_input_dir: docker input volume for the organiser (path inside the container).
        • docker_output_dir: docker output volume for the organiser (path inside the container).
        • allowed_field_values: list of fields with restricted set of values used to filter out unwanted images, e.g. FIELD=VALUE1,VALUE2,VALUE3 [FIELD2=VALUE1,VALUE2 ...]
      • nifti_reorganise:
        • output_folder: output folder that will contain the reorganised data.
        • output_folder_structure: description of the desired folder organisation. E.g. '#PatientID/#StudyID/#SeriesDescription/#SeriesNumber'
        • docker_image: organiser docker image.
        • docker_input_dir: docker input volume for the organiser (path inside the container).
        • docker_output_dir: docker output volume for the organiser (path inside the container).
      • trigger_preprocessing: scan the current folder and triggers preprocessing of images on each folder discovered
      • trigger_ehr: scan the current folder and triggers importation of EHR data on each folder discovered
  • If trigger_preprocessing is used, configure the [data-factory:<dataset>:reorganisation:trigger_preprocessing] section:

    • DEPTH: depth of folders to explore when triggering importation of EHR data
  • If trigger_ehr is used, configure the [data-factory:<dataset>:reorganisation:trigger_ehr] section:

    • DEPTH: depth of folders to explore when triggering preprocessing
  • For each dataset, now configure the [data-factory:<dataset>:preprocessing] section:

    • INPUT_FOLDER: Folder containing the original imaging data to process. This data should have been already anonymised by a tool. Not required when the reorganisation pipelines have been used before.
    • INPUT_CONFIG: List of flags defining how incoming imaging data are organised, values are
      • boost: (optional) When enabled, we consider that all the files from a same folder share the same meta-data. The processing is (about 2 times) faster. This option is enabled by default.
      • session_id_by_patient: Rarely, a data set might use study IDs which are unique by patient (not for the whole study). E.g.: LREN data. In such a case, you have to enable this flag. This will use PatientID + StudyID as a session ID.
      • visit_id_in_patient_id: Rarely, a data set might mix patient IDs and visit IDs. E.g. : LREN data. In such a case, you have to enable this flag. This will try to split PatientID into VisitID and PatientID.
      • visit_id_from_path: Enable this flag to get the visit ID from the folder hierarchy instead of DICOM meta-data (e.g. can be useful for PPMI).
      • repetition_from_path: Enable this flag to get the repetition ID from the folder hierarchy instead of DICOM meta-data (e.g. can be useful for PPMI).
    • MAX_ACTIVE_RUNS: maximum number of folders containing scans to pre-process in parallel
    • MIN_FREE_SPACE: minimum percentage of free space available on local disk
    • MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines.
    • PIPELINES_PATH: path to the root folder containing the Matlab scripts for the pipelines
    • PROTOCOLS_DEFINITION_FILE: path to the default protocols definition file defining the protocols used on the scanner.
    • SCANNERS: List of methods describing how the preprocessing data folder is scanned for new work, values are
      • continuous: input folder is scanned frequently for new data. Sub-folders should contain a .ready file to indicate that processing can be performed on that folder.
      • daily: input folder contains a sub-folder for the year, this folder contains daily sub-folders for each day of the year (format yyyyMMdd). Those daily sub-folders in turn contain the folders for each scan to process.
      • once: input folder contains a set of sub-folders each containing a scan to process.
    • PIPELINES: List of pipelines to execute. Values are
      • copy_to_local: if used, input data are first copied to a local folder to speed-up processing.
      • dicom_to_nifti: convert all DICOM files to Nifti format.
      • mpm_maps: computes the Multiparametric Maps (MPMs) and brain segmentation in different tissue maps.
      • neuro_morphometric_atlas: computes an individual Atlas based on the NeuroMorphometrics Atlas.
      • export_features: exports neuroimaging features stored in CSV files to the I2B2 database
      • catalog_to_i2b2: exports meta-data from the data catalog to the I2B2 database.
  • If copy_to_local is used, configure the [data-factory:<dataset>:preprocessing:copy_to_local] section:

    • OUTPUT_FOLDER: destination folder for the local copy
  • If dicom_to_nifti is used or required (when DICOM images are used as input), configure the [data-factory:<dataset>:preprocessing:dicom_to_nifti] section:

    • OUTPUT_FOLDER: destination folder for the Nifti images
    • BACKUP_FOLDER: backup folder for the Nitfi images
    • SPM_FUNCTION: SPM function called. Default to 'DCM2NII_LREN'
    • PIPELINE_PATH: path to the folder containing the SPM script for this pipeline. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/Nifti_Conversion_Pipeline'
    • MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines. Default to MISC_LIBRARY_PATH value in [data-factory:<dataset>:preprocessing] section.
    • PROTOCOLS_DEFINITION_FILE: path to the Protocols definition file defining the protocols used on the scanner. Default to PROTOCOLS_DEFINITION_FILE value in [data-factory:<dataset>:preprocessing] section.
    • DCM2NII_PROGRAM: Path to DCM2NII program. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/dcm2nii'
  • If mpm_maps is used, configure the [data-factory:<dataset>:preprocessing:mpm_maps] section:

    • OUTPUT_FOLDER: destination folder for the MPMs and brain segmentation
    • BACKUP_FOLDER: backup folder for the MPMs and brain segmentation
    • SPM_FUNCTION: SPM function called. Default to 'Preproc_mpm_maps'
    • PIPELINE_PATH: path to the folder containing the SPM script for this pipeline. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/MPMs_Pipeline'
    • MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines. Default to MISC_LIBRARY_PATH value in [data-factory:<dataset>:preprocessing] section.
    • PROTOCOLS_DEFINITION_FILE: path to the Protocols definition file defining the protocols used on the scanner. Default to PROTOCOLS_DEFINITION_FILE value in [data-factory:<dataset>:preprocessing] section.
  • If neuro_morphometric_atlas is used, configure the [data-factory:<dataset>:preprocessing:neuro_morphometric_atlas] section:

    • OUTPUT_FOLDER: destination folder for the Atlas File, the volumes of the Morphometric Atlas structures (.txt), the csv file containing the volume, and globals plus Multiparametric Maps (R2*, R1, MT, PD) for each structure defined in the Subject Atlas.
    • BACKUP_FOLDER: backup folder for the Atlas File, the volumes of the Morphometric Atlas structures (.txt), the csv file containing the volume, and globals plus Multiparametric Maps (R2*, R1, MT, PD) for each structure defined in the Subject Atlas.
    • SPM_FUNCTION: SPM function called. Default to 'NeuroMorphometric_pipeline'
    • PIPELINE_PATH: path to the folder containing the SPM script for this pipeline. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/NeuroMorphometric_Pipeline/NeuroMorphometric_tbx/label'
    • MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines. Default to MISC_LIBRARY_PATH value in [data-factory:<dataset>:preprocessing] section.
    • PROTOCOLS_DEFINITION_FILE: path to the Protocols definition file defining the protocols used on the scanner. Default to PROTOCOLS_DEFINITION_FILE value in [data-factory:<dataset>:preprocessing] section.
    • TPM_TEMPLATE: Path to the the template used for segmentation step in case the image is not segmented. Default to SPM_DIR + 'tpm/nwTPM_sl3.nii'
  • For each dataset, now configure the [data-factory:<dataset>:ehr] section:

    • INPUT_FOLDER: Folder containing the original EHR data to process. This data should have been already anonymised by a tool
    • INPUT_FOLDER_DEPTH: When a once scanner is used, indicates the depth of folders to traverse before reaching EHR data. Default to 1.
    • MIN_FREE_SPACE: minimum percentage of free space available on local disk
    • SCANNERS: List of methods describing how the EHR data folder is scanned for new work, values are
      • daily: input folder contains a sub-folder for the year, this folder contains daily sub-folders for each day of the year (format yyyyMMdd). Those daily sub-folders in turn contain the EHR files in CSV format to process.
      • once: input folder contains the EHR files in CSV format to process.
    • PIPELINES: List of pipelines to execute. Values are
      • map_ehr_to_i2b2: .
  • Configure the [data-factory:<dataset>:ehr:map_ehr_to_i2b2] section:

    • DOCKER_IMAGE: Docker image of the tool that maps EHR data to an I2B2 schema.
  • Configure the [data-factory:<dataset>:ehr:version_incoming_ehr] section:

    • OUTPUT_FOLDER: output folder used to store versioned EHR data.

Sample configuration:


[spm]
spm_dir = /opt/spm12
[mipmap]
db_config_file = /dev/null
[data-factory]
data_catalog_sql_alchemy_conn = postgresql://data_catalog:datacatalogpwd@demo:4321/data_catalog
datasets = main
email_errors_to =
i2b2_sql_alchemy_conn = postgresql://i2b2:i2b2pwd@demo:4321/i2b2
slack_channel = #data
slack_channel_user = Airflow
slack_token =
[data-factory:main]
dataset_label = Demo
[data-factory:main:preprocessing]
input_config = boost
input_folder = /data/demo
max_active_runs = 30
min_free_space = 0.3
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
pipelines = dicom_to_nifti,mpm_maps,neuro_morphometric_atlas,export_features,catalog_to_i2b2
pipelines_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
scanners = once
[data-factory:main:preprocessing:copy_to_local]
output_folder = /data/incoming
[data-factory:main:preprocessing:dicom_to_nifti]
backup_folder =
dcm2nii_program = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines/Nifti_Conversion_Pipeline/dcm2nii
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
output_folder = /data/nifti
pipeline_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
spm_function = DCM2NII_LREN
[data-factory:main:preprocessing:mpm_maps]
backup_folder =
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
output_folder = /data/mpm_maps
pipeline_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
spm_function = Preproc_mpm_maps
[data-factory:main:preprocessing:neuro_morphometric_atlas]
backup_folder =
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
output_folder = /data/neuro_morphometric_atlas
pipeline_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
spm_function = NeuroMorphometric_pipeline
tpm_template = /opt/spm12/tpm/nwTPM_sl3.nii
[data-factory:main:ehr]
input_folder = /data/ehr_demo
input_folder_depth = 0
max_active_runs = 30
min_free_space = 0.3
pipelines =
scanners = once
[data-factory:main:ehr:map_ehr_to_i2b2]
docker_image = hbpmip/mipmap-demo-ehr-to-i2b2:0.1
[data-factory:main:ehr:version_incoming_ehr]
output_folder = /data/ehr_versioned
[data-factory:main:reorganisation]
input_config = boost
input_folder = /data/demo
max_active_runs = 30
min_free_space = 0.3
pipelines = dicom_reorganise,trigger_preprocessing
[data-factory:main:reorganisation:copy_to_local]
output_folder = /data/all_incoming
[data-factory:main:reorganisation:dicom_reorganise]
docker_image = hbpmip/hierarchizer:1.3.6
docker_input_dir = /input_folder
docker_output_dir = /output_folder
output_folder = /data/dicom_organised
output_folder_structure = #PatientID/#StudyID/#SeriesDescription/#SeriesNumber
[data-factory:main:reorganisation:nifti_reorganise]
docker_image = hbpmip/hierarchizer:1.3.6
docker_input_dir = /input_folder
docker_output_dir = /output_folder
output_folder = /data/nifti_organised
output_folder_structure = #PatientID/#StudyID/#SeriesDescription/#SeriesNumber
[data-factory:main:reorganisation:trigger_preprocessing]
depth = 1
[data-factory:main:reorganisation:trigger_ehr]
depth = 0

Acknowledgements

This work has been funded by the European Union Seventh Framework Program (FP7/2007­2013) under grant agreement no. 604102 (HBP)

This work is part of SP8 of the Human Brain Project (SGA1).

About

DAGs adapting the MRI preprocessing pipeline to Airflow

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •