Skip to content

Commit d6d8f64

Browse files
committed
Skip tasks or error out when resources are missing
- If tasks are selected with --task or --task-filter and a needed resource is missing, we now error out. - If tasks are not specifically selected, we now only run tasks for which resources are available (and error out if no resources exist at all) - If --allow-missing-resources is passed, we skip both above checks and do historical behavior of running any given task with zero input rows (which pushes up a schema, cleans up the delta lake, and stamps the resource as complete in the completion tracking tables).
1 parent 3256616 commit d6d8f64

File tree

12 files changed

+195
-45
lines changed

12 files changed

+195
-45
lines changed

cumulus_etl/errors.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
SERVICE_MISSING = 33 # generic init-check service is missing
3535
COMPLETION_ARG_MISSING = 34
3636
TASK_HELP = 35
37+
MISSING_REQUESTED_RESOURCES = 36
3738

3839

3940
class FatalError(Exception):

cumulus_etl/etl/cli.py

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22

33
import argparse
44
import datetime
5+
import logging
56
import os
67
import shutil
78
import sys
8-
from collections.abc import Iterable
99

10+
import cumulus_fhir_support
1011
import rich
1112
import rich.table
1213

@@ -16,6 +17,9 @@
1617
from cumulus_etl.etl.config import JobConfig, JobSummary
1718
from cumulus_etl.etl.tasks import task_factory
1819

20+
TaskList = list[type[tasks.EtlTask]]
21+
22+
1923
###############################################################################
2024
#
2125
# Main Pipeline (run all tasks)
@@ -24,7 +28,7 @@
2428

2529

2630
async def etl_job(
27-
config: JobConfig, selected_tasks: list[type[tasks.EtlTask]], use_philter: bool = False
31+
config: JobConfig, selected_tasks: TaskList, use_philter: bool = False
2832
) -> list[JobSummary]:
2933
"""
3034
:param config: job config
@@ -68,7 +72,7 @@ def check_mstool() -> None:
6872
raise SystemExit(errors.MSTOOL_MISSING)
6973

7074

71-
async def check_requirements(selected_tasks: Iterable[type[tasks.EtlTask]]) -> None:
75+
async def check_requirements(selected_tasks: TaskList) -> None:
7276
"""
7377
Verifies that all external services and programs are ready
7478
@@ -123,6 +127,11 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
123127
parser.add_argument(
124128
"--errors-to", metavar="DIR", help="where to put resources that could not be processed"
125129
)
130+
parser.add_argument(
131+
"--allow-missing-resources",
132+
action="store_true",
133+
help="run tasks even if their resources are not present",
134+
)
126135

127136
cli_utils.add_aws(parser)
128137
cli_utils.add_auth(parser)
@@ -148,7 +157,7 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
148157

149158

150159
def print_config(
151-
args: argparse.Namespace, job_datetime: datetime.datetime, all_tasks: Iterable[tasks.EtlTask]
160+
args: argparse.Namespace, job_datetime: datetime.datetime, all_tasks: TaskList
152161
) -> None:
153162
"""
154163
Prints the ETL configuration to the console.
@@ -219,6 +228,49 @@ def handle_completion_args(
219228
return export_group_name, export_datetime
220229

221230

231+
async def check_available_resources(
232+
loader: loaders.Loader,
233+
*,
234+
requested_resources: set[str],
235+
args: argparse.Namespace,
236+
is_default_tasks: bool,
237+
) -> set[str]:
238+
# Here we try to reconcile which resources the user requested and which resources are actually
239+
# available in the input root.
240+
# - If the user didn't specify a specific task, we'll scope down the requested resources to
241+
# what is actually present in the input.
242+
# - If they did, we'll complain if their required resources are not available.
243+
#
244+
# Reconciling is helpful for performance reasons (don't need to finalize untouched tables),
245+
# UX reasons (can tell user if they made a CLI mistake), and completion tracking (don't
246+
# mark a resource as complete if we didn't even export it)
247+
if args.allow_missing_resources:
248+
return requested_resources
249+
250+
detected = await loader.detect_resources()
251+
if detected is None:
252+
return requested_resources # likely we haven't run bulk export yet
253+
254+
if missing_resources := requested_resources - detected:
255+
for resource in sorted(missing_resources):
256+
# Log the same message we would print if in common.py if we ran tasks anyway
257+
logging.warning("No %s files found in %s", resource, loader.root.path)
258+
259+
if is_default_tasks:
260+
requested_resources -= missing_resources # scope down to detected resources
261+
if not requested_resources:
262+
errors.fatal(
263+
f"No supported resources found.",
264+
errors.MISSING_REQUESTED_RESOURCES,
265+
)
266+
else:
267+
msg = f"Required resources not found.\n"
268+
msg += "Add --allow-missing-resources to run related tasks anyway with no input."
269+
errors.fatal(msg, errors.MISSING_REQUESTED_RESOURCES)
270+
271+
return requested_resources
272+
273+
222274
async def etl_main(args: argparse.Namespace) -> None:
223275
# Set up some common variables
224276

@@ -232,6 +284,7 @@ async def etl_main(args: argparse.Namespace) -> None:
232284
job_datetime = common.datetime_now() # grab timestamp before we do anything
233285

234286
selected_tasks = task_factory.get_selected_tasks(args.task, args.task_filter)
287+
is_default_tasks = not args.task and not args.task_filter
235288

236289
# Print configuration
237290
print_config(args, job_datetime, selected_tasks)
@@ -266,8 +319,17 @@ async def etl_main(args: argparse.Namespace) -> None:
266319
resume=args.resume,
267320
)
268321

322+
required_resources = await check_available_resources(
323+
config_loader,
324+
args=args,
325+
is_default_tasks=is_default_tasks,
326+
requested_resources=required_resources,
327+
)
328+
# Drop any tasks that we didn't find resources for
329+
selected_tasks = [t for t in selected_tasks if t.resource in required_resources]
330+
269331
# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
270-
loader_results = await config_loader.load_all(list(required_resources))
332+
loader_results = await config_loader.load_resources(required_resources)
271333

272334
# Establish the group name and datetime of the loaded dataset (from CLI args or Loader)
273335
export_group_name, export_datetime = handle_completion_args(args, loader_results)

cumulus_etl/loaders/base.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,15 @@ def __init__(self, root: store.Root):
4646
self.root = root
4747

4848
@abc.abstractmethod
49-
async def load_all(self, resources: list[str]) -> LoaderResults:
49+
async def detect_resources(self) -> set[str] | None:
50+
"""
51+
Inspect which resources are available for use.
52+
53+
:returns: the types of resources detected (or None if that can't be determined yet)
54+
"""
55+
56+
@abc.abstractmethod
57+
async def load_resources(self, resources: set[str]) -> LoaderResults:
5058
"""
5159
Loads the listed remote resources and places them into a local folder as FHIR ndjson
5260

cumulus_etl/loaders/fhir/bulk_export.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class BulkExporter:
3030
def __init__(
3131
self,
3232
client: fhir.FhirClient,
33-
resources: list[str],
33+
resources: set[str],
3434
url: str,
3535
destination: str,
3636
*,
@@ -81,7 +81,7 @@ def format_kickoff_url(
8181
self,
8282
url: str,
8383
*,
84-
resources: list[str],
84+
resources: set[str],
8585
since: str | None,
8686
until: str | None,
8787
prefer_url_resources: bool,

cumulus_etl/loaders/fhir/ndjson_loader.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import tempfile
44

5+
import cumulus_fhir_support
6+
57
from cumulus_etl import cli_utils, common, errors, fhir, store
68
from cumulus_etl.loaders import base
79
from cumulus_etl.loaders.fhir.bulk_export import BulkExporter
@@ -37,7 +39,16 @@ def __init__(
3739
self.until = until
3840
self.resume = resume
3941

40-
async def load_all(self, resources: list[str]) -> base.LoaderResults:
42+
async def detect_resources(self) -> set[str] | None:
43+
if self.root.protocol in {"http", "https"}:
44+
return None # we haven't done the export yet!
45+
46+
found_files = cumulus_fhir_support.list_multiline_json_in_dir(
47+
self.root.path, fsspec_fs=self.root.fs
48+
)
49+
return {resource for resource in found_files.values() if resource}
50+
51+
async def load_resources(self, resources: set[str]) -> base.LoaderResults:
4152
# Are we doing a bulk FHIR export from a server?
4253
if self.root.protocol in ["http", "https"]:
4354
bulk_dir = await self.load_from_bulk_export(resources)
@@ -61,14 +72,14 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:
6172
# TemporaryDirectory gets discarded), but that seems reasonable.
6273
print("Copying ndjson input files…")
6374
tmpdir = tempfile.TemporaryDirectory()
64-
filenames = common.ls_resources(input_root, set(resources), warn_if_empty=True)
75+
filenames = common.ls_resources(input_root, resources, warn_if_empty=True)
6576
for filename in filenames:
6677
input_root.get(filename, f"{tmpdir.name}/")
6778

6879
return self.read_loader_results(input_root, tmpdir)
6980

7081
async def load_from_bulk_export(
71-
self, resources: list[str], prefer_url_resources: bool = False
82+
self, resources: set[str], prefer_url_resources: bool = False
7283
) -> common.Directory:
7384
"""
7485
Performs a bulk export and drops the results in an export dir.

cumulus_etl/loaders/i2b2/loader.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,27 @@ def __init__(self, root: store.Root, export_to: str | None = None):
3434
super().__init__(root)
3535
self.export_to = export_to
3636

37-
async def load_all(self, resources: list[str]) -> base.LoaderResults:
37+
async def detect_resources(self) -> set[str] | None:
38+
if self.root.protocol in {"tcp"}:
39+
return None # we haven't done the export yet!
40+
41+
filenames = {
42+
"observation_fact_diagnosis.csv": "Condition",
43+
"observation_fact_lab_views.csv": "Observation",
44+
"observation_fact_medications.csv": "MedicationRequest",
45+
"observation_fact_notes.csv": "DocumentReference",
46+
"observation_fact_vitals.csv": "Observation",
47+
"patient_dimension.csv": "Patient",
48+
"visit_dimension.csv": "Encounter",
49+
}
50+
51+
return {
52+
resource
53+
for path, resource in filenames.items()
54+
if self.root.exists(self.root.joinpath(path))
55+
}
56+
57+
async def load_resources(self, resources: set[str]) -> base.LoaderResults:
3858
if self.root.protocol in ["tcp"]:
3959
directory = self._load_all_from_oracle(resources)
4060
else:
@@ -43,7 +63,7 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:
4363

4464
def _load_all_with_extractors(
4565
self,
46-
resources: list[str],
66+
resources: set[str],
4767
conditions: I2b2ExtractorCallable,
4868
lab_views: I2b2ExtractorCallable,
4969
medicationrequests: I2b2ExtractorCallable,
@@ -139,7 +159,7 @@ def _loop(
139159
#
140160
###################################################################################################################
141161

142-
def _load_all_from_csv(self, resources: list[str]) -> common.Directory:
162+
def _load_all_from_csv(self, resources: set[str]) -> common.Directory:
143163
path = self.root.path
144164
return self._load_all_with_extractors(
145165
resources,
@@ -177,7 +197,7 @@ def _load_all_from_csv(self, resources: list[str]) -> common.Directory:
177197
#
178198
###################################################################################################################
179199

180-
def _load_all_from_oracle(self, resources: list[str]) -> common.Directory:
200+
def _load_all_from_oracle(self, resources: set[str]) -> common.Directory:
181201
path = self.root.path
182202
return self._load_all_with_extractors(
183203
resources,

cumulus_etl/upload_notes/downloader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async def download_docrefs_from_fhir_server(
2727
else:
2828
# else we'll download the entire target path as a bulk export (presumably the user has scoped a Group)
2929
ndjson_loader = loaders.FhirNdjsonLoader(root_input, client, export_to=export_to)
30-
return await ndjson_loader.load_all(["DocumentReference"])
30+
return await ndjson_loader.load_resources({"DocumentReference"})
3131

3232

3333
async def _download_docrefs_from_fake_ids(

tests/etl/test_etl_cli.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,17 @@ async def test_failed_task(self):
103103

104104
async def test_single_task(self):
105105
# Grab all observations before we mock anything
106-
observations = loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_all(
107-
["Observation"]
106+
observations = loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_resources(
107+
{"Observation"}
108108
)
109109

110-
def fake_load_all(internal_self, resources):
110+
def fake_load_resources(internal_self, resources):
111111
del internal_self
112112
# Confirm we only tried to load one resource
113-
self.assertEqual(["Observation"], resources)
113+
self.assertEqual({"Observation"}, resources)
114114
return observations
115115

116-
with mock.patch.object(loaders.FhirNdjsonLoader, "load_all", new=fake_load_all):
116+
with mock.patch.object(loaders.FhirNdjsonLoader, "load_resources", new=fake_load_resources):
117117
await self.run_etl(tasks=["observation"])
118118

119119
# Confirm we only wrote the one resource
@@ -126,17 +126,17 @@ def fake_load_all(internal_self, resources):
126126

127127
async def test_multiple_tasks(self):
128128
# Grab all observations before we mock anything
129-
loaded = loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_all(
130-
["Observation", "Patient"]
129+
loaded = loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_resources(
130+
{"Observation", "Patient"}
131131
)
132132

133-
def fake_load_all(internal_self, resources):
133+
def fake_load_resources(internal_self, resources):
134134
del internal_self
135135
# Confirm we only tried to load two resources
136-
self.assertEqual({"Observation", "Patient"}, set(resources))
136+
self.assertEqual({"Observation", "Patient"}, resources)
137137
return loaded
138138

139-
with mock.patch.object(loaders.FhirNdjsonLoader, "load_all", new=fake_load_all):
139+
with mock.patch.object(loaders.FhirNdjsonLoader, "load_resources", new=fake_load_resources):
140140
await self.run_etl(tasks=["observation", "patient"])
141141

142142
# Confirm we only wrote the two resources
@@ -267,16 +267,18 @@ async def test_task_init_checks(self, mock_check):
267267
async def test_completion_args(self, etl_args, loader_vals, expected_vals):
268268
"""Verify that we parse completion args with the correct fallbacks and checks."""
269269
# Grab all observations before we mock anything
270-
observations = await loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_all(
271-
["Observation"]
270+
observations = await loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_resources(
271+
{"Observation"}
272272
)
273273
observations.group_name = loader_vals[0]
274274
observations.export_datetime = loader_vals[1]
275275

276276
with (
277277
self.assertRaises(SystemExit) as cm,
278278
mock.patch("cumulus_etl.etl.cli.etl_job", side_effect=SystemExit) as mock_etl_job,
279-
mock.patch.object(loaders.FhirNdjsonLoader, "load_all", return_value=observations),
279+
mock.patch.object(
280+
loaders.FhirNdjsonLoader, "load_resources", return_value=observations
281+
),
280282
):
281283
await self.run_etl(tasks=["observation"], **etl_args)
282284

@@ -297,14 +299,36 @@ async def test_deleted_ids_passed_down(self):
297299
with (
298300
self.assertRaises(SystemExit),
299301
mock.patch("cumulus_etl.etl.cli.etl_job", side_effect=SystemExit) as mock_etl_job,
300-
mock.patch.object(loaders.FhirNdjsonLoader, "load_all", return_value=results),
302+
mock.patch.object(loaders.FhirNdjsonLoader, "load_resources", return_value=results),
301303
):
302304
await self.run_etl(tasks=["observation"])
303305

304306
self.assertEqual(mock_etl_job.call_count, 1)
305307
config = mock_etl_job.call_args[0][0]
306308
self.assertEqual({"Observation": {"obs1"}}, config.deleted_ids)
307309

310+
@ddt.data(["patient"], None)
311+
async def test_missing_resources(self, tasks):
312+
with tempfile.TemporaryDirectory() as tmpdir:
313+
with self.assertRaises(SystemExit) as cm:
314+
await self.run_etl(tasks=tasks, input_path=tmpdir)
315+
self.assertEqual(errors.MISSING_REQUESTED_RESOURCES, cm.exception.code)
316+
317+
async def test_allow_missing_resources(self):
318+
with tempfile.TemporaryDirectory() as tmpdir:
319+
await self.run_etl("--allow-missing-resources", tasks=["patient"], input_path=tmpdir)
320+
321+
self.assertEqual("", common.read_text(f"{self.output_path}/patient/patient.000.ndjson"))
322+
323+
async def test_missing_resources_skips_tasks(self):
324+
with tempfile.TemporaryDirectory() as tmpdir:
325+
common.write_json(f"{tmpdir}/p.ndjson", {"id": "A", "resourceType": "Patient"})
326+
await self.run_etl(input_path=tmpdir)
327+
328+
self.assertEqual(
329+
{"etl__completion", "patient", "JobConfig"}, set(os.listdir(self.output_path))
330+
)
331+
308332

309333
class TestEtlJobConfig(BaseEtlSimple):
310334
"""Test case for the job config logging data"""

0 commit comments

Comments
 (0)