Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions clinica/iotools/converters/adni_to_bids/adni_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
import pandas as pd

from clinica.utils.exceptions import ClinicaXMLParserError
from clinica.utils.stream import cprint
from clinica.utils.stream import cprint, log_and_raise

__all__ = ["create_json_metadata"]


LOGGING_HEADER = "[ADNI JSON]"


def _log_and_raise(message: str):
cprint(f"{LOGGING_HEADER} {message}", lvl="error")
raise ClinicaXMLParserError(f"{LOGGING_HEADER} {message}")
def _log_and_raise_xml_parser_error(message: str):
log_and_raise(f"{LOGGING_HEADER} {message}", ClinicaXMLParserError)


# Map from names of extracted metadata to their proper BIDS names.
Expand Down Expand Up @@ -65,7 +64,9 @@ def _read_xml_files(
def _check_xml_tag(element_tag: str, expected_tag: str):
"""Check that the XML element tagged matches the expected tag."""
if element_tag != expected_tag:
_log_and_raise(f"Bad tag: expected {expected_tag}, got {element_tag}")
_log_and_raise_xml_parser_error(
f"Bad tag: expected {expected_tag}, got {element_tag}"
)


def _check_xml_nb_children(
Expand All @@ -78,13 +79,13 @@ def _check_xml_nb_children(
nb_children = len(xml_el)
if isinstance(expected_nb_children, int):
if nb_children != expected_nb_children:
_log_and_raise(
_log_and_raise_xml_parser_error(
f"Bad number of children for <{xml_el.tag}>: "
f"got {nb_children} != {expected_nb_children}"
)
else:
if nb_children not in expected_nb_children:
_log_and_raise(
_log_and_raise_xml_parser_error(
f"Bad number of children for <{xml_el.tag}>: "
f"got {nb_children}, not in {expected_nb_children}"
)
Expand Down Expand Up @@ -112,8 +113,8 @@ def _parse_project(
Check that the project identifier contains ADNI.
"""
if len(root) != 1:
_log_and_raise(
"XML root should have only one child. " f"{len(root)} children found."
_log_and_raise_xml_parser_error(
"XML root should have only one child. " f"{len(root)} children found.",
)
project = _check_xml(root[0], "project", 4)
_check_xml_project_identifier(project, "ADNI")
Expand All @@ -138,7 +139,7 @@ def _get_text(xml_element: xml.etree.ElementTree.Element, cast=None) -> str:
def _check_derived_image_xml(derived: xml.etree.ElementTree.Element) -> None:
"""Perform sanity checks on derived images."""
if len(derived) < 9:
_log_and_raise("Derived image does not have enough field.")
_log_and_raise_xml_parser_error("Derived image does not have enough field.")
for idx, field, expected in zip(
[2, 3, 4, 5, 6],
["imageType", "tissue", "hemisphere", "anatomicStructure", "registration"],
Expand All @@ -152,15 +153,17 @@ def _check_derived_image_xml(derived: xml.etree.ElementTree.Element) -> None:
try:
_validate_date_iso_format(image_create_date)
except ValueError as e:
_log_and_raise(f"The creationDate for the derived image is not valid: {e}")
_log_and_raise_xml_parser_error(
f"The creationDate for the derived image is not valid: {e}"
)


def _check_xml_field(
xml_element: xml.etree.ElementTree.Element, field: str, expected_value: str
) -> None:
"""Check that the given field of the given XML element has the expected value."""
if (found_value := _check_xml_and_get_text(xml_element, field)) != expected_value:
_log_and_raise(
_log_and_raise_xml_parser_error(
f"The {field} for the derived image should be '{expected_value}'. "
f"{found_value} was found instead."
)
Expand Down Expand Up @@ -193,7 +196,7 @@ def _check_xml_project_identifier(
):
"""Check the project identifier."""
if _check_xml_and_get_text(project[0], "projectIdentifier") != expected:
_log_and_raise(f"Not {expected} cohort")
_log_and_raise_xml_parser_error(f"Not {expected} cohort")


def _get_original_image_metadata(original_image: xml.etree.ElementTree.Element) -> dict:
Expand Down Expand Up @@ -248,7 +251,9 @@ def _get_root_from_xml_path(xml_path: Path) -> xml.etree.ElementTree.Element:
root = tree.getroot()
return root
except Exception as e:
_log_and_raise(f"Error parsing XML file {xml_path.name} :\n{e}")
_log_and_raise_xml_parser_error(
f"Error parsing XML file {xml_path.name} :\n{e}"
)


def _parse_series(
Expand Down Expand Up @@ -294,13 +299,13 @@ def _check_image(img: xml.etree.ElementTree.Element):
"imagingProtocol", # for original image metadata
"originalRelatedImage", # for processed image
}:
_log_and_raise(
_log_and_raise_xml_parser_error(
f"Bad image tag <{img.tag}>. "
"Should be either 'imagingProtocol' or 'originalRelatedImage'."
)
if len(img) not in {3, 4}:
_log_and_raise(
f"Image XML element has {len(img)} children. " "Expected either 3 or 4."
_log_and_raise_xml_parser_error(
f"Image XML element has {len(img)} children. " "Expected either 3 or 4.",
)


Expand Down Expand Up @@ -361,7 +366,9 @@ def _check_modality(study: xml.etree.ElementTree.Element, expected_modality: str
series = _parse_series(study)
modality = _check_xml_and_get_text(series[1], "modality")
if modality != expected_modality:
_log_and_raise(f"Unexpected modality {modality}, expected {expected_modality}.")
_log_and_raise_xml_parser_error(
f"Unexpected modality {modality}, expected {expected_modality}."
)


def _parse_subject(
Expand Down Expand Up @@ -418,7 +425,9 @@ def _parse_xml_file(xml_path: Path) -> dict:
**derived_image_metadata,
}
if "image_proc_id" not in scan_metadata and "image_orig_id" not in scan_metadata:
_log_and_raise(f"Scan metadata for subject {subject_id} has no image ID.")
_log_and_raise_xml_parser_error(
f"Scan metadata for subject {subject_id} has no image ID."
)
return scan_metadata


Expand Down
16 changes: 16 additions & 0 deletions clinica/utils/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""This module handles stream and log redirection."""

from enum import Enum
from typing import Type


class LoggingLevel(str, Enum):
Expand Down Expand Up @@ -38,3 +39,18 @@ def cprint(msg: str, lvl: str = "info") -> None:
logger.critical(msg=msg)
else:
pass


def log_and_raise(message: str, error_type: Type[Exception]):
"""Log the error message using cprint and raise.

Parameters
----------
message : str
The error message.

error_type : Exception
The error type to raise.
"""
cprint(message, lvl="error")
raise error_type(message)