|
| 1 | +import json |
| 2 | +from pathlib import Path |
| 3 | + |
| 4 | +from epilepsy2bids.annotations import Annotations |
| 5 | +import numpy as np |
| 6 | +from timescoring import scoring |
| 7 | +from timescoring.annotations import Annotation |
| 8 | + |
| 9 | + |
| 10 | +class Result(scoring._Scoring): |
| 11 | + """Helper class built on top of scoring._Scoring that implements the sum |
| 12 | + operator between two scoring objects. The sum corresponds to the |
| 13 | + concatenation of both objects. |
| 14 | + Args: |
| 15 | + scoring (scoring._Scoring): initialized as None (all zeros) or from a |
| 16 | + scoring._Scoring object. |
| 17 | + """ |
| 18 | + |
| 19 | + def __init__(self, score: scoring._Scoring = None): |
| 20 | + if score is None: |
| 21 | + self.fs = 0 |
| 22 | + self.duration = 0 |
| 23 | + self.numSamples = 0 |
| 24 | + self.tp = 0 |
| 25 | + self.fp = 0 |
| 26 | + self.refTrue = 0 |
| 27 | + else: |
| 28 | + self.fs = score.ref.fs |
| 29 | + self.duration = len(score.ref.mask) / score.ref.fs |
| 30 | + self.numSamples = score.numSamples |
| 31 | + self.tp = score.tp |
| 32 | + self.fp = score.fp |
| 33 | + self.refTrue = score.refTrue |
| 34 | + |
| 35 | + def __add__(self, other_result: scoring._Scoring): |
| 36 | + new_result = Result() |
| 37 | + new_result.fs = other_result.fs |
| 38 | + new_result.duration = self.duration + other_result.duration |
| 39 | + new_result.numSamples = self.numSamples + other_result.numSamples |
| 40 | + new_result.tp = self.tp + other_result.tp |
| 41 | + new_result.fp = self.fp + other_result.fp |
| 42 | + new_result.refTrue = self.refTrue + other_result.refTrue |
| 43 | + |
| 44 | + return new_result |
| 45 | + |
| 46 | + def __iadd__(self, other_result: scoring._Scoring): |
| 47 | + self.fs = other_result.fs |
| 48 | + self.duration += other_result.duration |
| 49 | + self.numSamples += other_result.numSamples |
| 50 | + self.tp += other_result.tp |
| 51 | + self.fp += other_result.fp |
| 52 | + self.refTrue += other_result.refTrue |
| 53 | + |
| 54 | + return self |
| 55 | + |
| 56 | + |
| 57 | +def evaluate_dataset( |
| 58 | + reference: Path, hypothesis: Path, outFile: Path, avg_per_subject=True |
| 59 | +) -> dict: |
| 60 | + """ |
| 61 | + Compares two sets of seizure annotations accross a full dataset. |
| 62 | +
|
| 63 | + Parameters: |
| 64 | + reference (Path): The path to the folder containing the reference TSV files. |
| 65 | + hypothesis (Path): The path to the folder containing the hypothesis TSV files. |
| 66 | + outFile (Path): The path to the output JSON file where the results are saved. |
| 67 | + avg_per_subject (bool): Whether to compute average scores per subject or |
| 68 | + average across the full dataset. |
| 69 | +
|
| 70 | + Returns: |
| 71 | + dict. return the evaluation result. The dictionary contains the following |
| 72 | + keys: {'sample_results': {'sensitivity', 'precision', 'f1', 'fpRate', |
| 73 | + 'sensitivity_std', 'precision_std', 'f1_std', 'fpRate_std'}, |
| 74 | + 'event_results':{...} |
| 75 | + } |
| 76 | + """ |
| 77 | + |
| 78 | + FS = 1 |
| 79 | + |
| 80 | + sample_results = dict() |
| 81 | + event_results = dict() |
| 82 | + for subject in Path(reference).glob("sub-*"): |
| 83 | + sample_results[subject.name] = Result() |
| 84 | + event_results[subject.name] = Result() |
| 85 | + |
| 86 | + for ref_tsv in subject.glob("**/*.tsv"): |
| 87 | + # Load reference |
| 88 | + ref = Annotations.loadTsv(ref_tsv) |
| 89 | + ref = Annotation(ref.getMask(FS), FS) |
| 90 | + |
| 91 | + # Load hypothesis |
| 92 | + hyp_tsv = Path(hypothesis) / ref_tsv.relative_to(reference) |
| 93 | + if hyp_tsv.exists(): |
| 94 | + hyp = Annotations.loadTsv(hyp_tsv) |
| 95 | + hyp = Annotation(hyp.getMask(FS), FS) |
| 96 | + else: |
| 97 | + hyp = Annotation(np.zeros_like(ref.mask), ref.fs) |
| 98 | + |
| 99 | + # Compute evaluation |
| 100 | + sample_score = scoring.SampleScoring(ref, hyp) |
| 101 | + event_score = scoring.EventScoring(ref, hyp) |
| 102 | + |
| 103 | + # Store results |
| 104 | + sample_results[subject.name] += Result(sample_score) |
| 105 | + event_results[subject.name] += Result(event_score) |
| 106 | + |
| 107 | + # Compute scores |
| 108 | + sample_results[subject.name].computeScores() |
| 109 | + event_results[subject.name].computeScores() |
| 110 | + |
| 111 | + aggregated_sample_results = dict() |
| 112 | + aggregated_event_results = dict() |
| 113 | + if avg_per_subject: |
| 114 | + for result_builder, aggregated_result in zip( |
| 115 | + (sample_results, event_results), |
| 116 | + (aggregated_sample_results, aggregated_event_results), |
| 117 | + ): |
| 118 | + for metric in ["sensitivity", "precision", "f1", "fpRate"]: |
| 119 | + aggregated_result[metric] = np.mean( |
| 120 | + [getattr(x, metric) for x in result_builder.values()] |
| 121 | + ) |
| 122 | + aggregated_result[f"{metric}_std"] = np.std( |
| 123 | + [getattr(x, metric) for x in result_builder.values()] |
| 124 | + ) |
| 125 | + else: |
| 126 | + for result_builder, aggregated_result in zip( |
| 127 | + (sample_results, event_results), |
| 128 | + (aggregated_sample_results, aggregated_event_results), |
| 129 | + ): |
| 130 | + result_builder["cumulated"] = Result() |
| 131 | + for result in result_builder.values(): |
| 132 | + result_builder["cumulated"] += result |
| 133 | + result_builder["cumulated"].computeScores() |
| 134 | + for metric in ["sensitivity", "precision", "f1", "fpRate"]: |
| 135 | + aggregated_result[metric] = getattr(result_builder["cumulated"], metric) |
| 136 | + |
| 137 | + output = { |
| 138 | + "sample_results": aggregated_sample_results, |
| 139 | + "event_results": aggregated_event_results, |
| 140 | + } |
| 141 | + with open(outFile, "w") as file: |
| 142 | + json.dump(output, file, indent=2, sort_keys=False) |
| 143 | + |
| 144 | + return output |
0 commit comments