Skip to content

Commit 6262610

Browse files
committed
Skip tasks or error out when resources are missing
- If tasks are selected with --task or --task-filter and a needed resource is missing, we now error out. - If tasks are not specifically selected, we now only run tasks for which resources are available (and error out if no resources exist at all) - If --allow-missing-resources is passed, we skip both above checks and do historical behavior of running any given task with zero input rows (which pushes up a schema, cleans up the delta lake, and stamps the resource as complete in the completion tracking tables).
1 parent ceb9ce7 commit 6262610

File tree

13 files changed

+199
-48
lines changed

13 files changed

+199
-48
lines changed

cumulus_etl/errors.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
SERVICE_MISSING = 33 # generic init-check service is missing
3535
COMPLETION_ARG_MISSING = 34
3636
TASK_HELP = 35
37+
MISSING_REQUESTED_RESOURCES = 36
3738

3839

3940
class FatalError(Exception):

cumulus_etl/etl/cli.py

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22

33
import argparse
44
import datetime
5+
import logging
56
import os
67
import shutil
78
import sys
8-
from collections.abc import Iterable
99

1010
import rich
1111
import rich.table
@@ -16,6 +16,9 @@
1616
from cumulus_etl.etl.config import JobConfig, JobSummary
1717
from cumulus_etl.etl.tasks import task_factory
1818

19+
TaskList = list[type[tasks.EtlTask]]
20+
21+
1922
###############################################################################
2023
#
2124
# Main Pipeline (run all tasks)
@@ -24,7 +27,7 @@
2427

2528

2629
async def etl_job(
27-
config: JobConfig, selected_tasks: list[type[tasks.EtlTask]], use_philter: bool = False
30+
config: JobConfig, selected_tasks: TaskList, use_philter: bool = False
2831
) -> list[JobSummary]:
2932
"""
3033
:param config: job config
@@ -68,7 +71,7 @@ def check_mstool() -> None:
6871
raise SystemExit(errors.MSTOOL_MISSING)
6972

7073

71-
async def check_requirements(selected_tasks: Iterable[type[tasks.EtlTask]]) -> None:
74+
async def check_requirements(selected_tasks: TaskList) -> None:
7275
"""
7376
Verifies that all external services and programs are ready
7477
@@ -118,6 +121,11 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
118121
parser.add_argument(
119122
"--errors-to", metavar="DIR", help="where to put resources that could not be processed"
120123
)
124+
parser.add_argument(
125+
"--allow-missing-resources",
126+
action="store_true",
127+
help="run tasks even if their resources are not present",
128+
)
121129

122130
cli_utils.add_aws(parser)
123131
cli_utils.add_auth(parser)
@@ -143,7 +151,7 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
143151

144152

145153
def print_config(
146-
args: argparse.Namespace, job_datetime: datetime.datetime, all_tasks: Iterable[tasks.EtlTask]
154+
args: argparse.Namespace, job_datetime: datetime.datetime, all_tasks: TaskList
147155
) -> None:
148156
"""
149157
Prints the ETL configuration to the console.
@@ -214,6 +222,49 @@ def handle_completion_args(
214222
return export_group_name, export_datetime
215223

216224

225+
async def check_available_resources(
226+
loader: loaders.Loader,
227+
*,
228+
requested_resources: set[str],
229+
args: argparse.Namespace,
230+
is_default_tasks: bool,
231+
) -> set[str]:
232+
# Here we try to reconcile which resources the user requested and which resources are actually
233+
# available in the input root.
234+
# - If the user didn't specify a specific task, we'll scope down the requested resources to
235+
# what is actually present in the input.
236+
# - If they did, we'll complain if their required resources are not available.
237+
#
238+
# Reconciling is helpful for performance reasons (don't need to finalize untouched tables),
239+
# UX reasons (can tell user if they made a CLI mistake), and completion tracking (don't
240+
# mark a resource as complete if we didn't even export it)
241+
if args.allow_missing_resources:
242+
return requested_resources
243+
244+
detected = await loader.detect_resources()
245+
if detected is None:
246+
return requested_resources # likely we haven't run bulk export yet
247+
248+
if missing_resources := requested_resources - detected:
249+
for resource in sorted(missing_resources):
250+
# Log the same message we would print if in common.py if we ran tasks anyway
251+
logging.warning("No %s files found in %s", resource, loader.root.path)
252+
253+
if is_default_tasks:
254+
requested_resources -= missing_resources # scope down to detected resources
255+
if not requested_resources:
256+
errors.fatal(
257+
"No supported resources found.",
258+
errors.MISSING_REQUESTED_RESOURCES,
259+
)
260+
else:
261+
msg = "Required resources not found.\n"
262+
msg += "Add --allow-missing-resources to run related tasks anyway with no input."
263+
errors.fatal(msg, errors.MISSING_REQUESTED_RESOURCES)
264+
265+
return requested_resources
266+
267+
217268
async def etl_main(args: argparse.Namespace) -> None:
218269
# Set up some common variables
219270

@@ -227,6 +278,7 @@ async def etl_main(args: argparse.Namespace) -> None:
227278
job_datetime = common.datetime_now() # grab timestamp before we do anything
228279

229280
selected_tasks = task_factory.get_selected_tasks(args.task, args.task_filter)
281+
is_default_tasks = not args.task and not args.task_filter
230282

231283
# Print configuration
232284
print_config(args, job_datetime, selected_tasks)
@@ -261,8 +313,17 @@ async def etl_main(args: argparse.Namespace) -> None:
261313
resume=args.resume,
262314
)
263315

316+
required_resources = await check_available_resources(
317+
config_loader,
318+
args=args,
319+
is_default_tasks=is_default_tasks,
320+
requested_resources=required_resources,
321+
)
322+
# Drop any tasks that we didn't find resources for
323+
selected_tasks = [t for t in selected_tasks if t.resource in required_resources]
324+
264325
# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
265-
loader_results = await config_loader.load_all(list(required_resources))
326+
loader_results = await config_loader.load_resources(required_resources)
266327

267328
# Establish the group name and datetime of the loaded dataset (from CLI args or Loader)
268329
export_group_name, export_datetime = handle_completion_args(args, loader_results)

cumulus_etl/etl/tasks/base.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,6 @@ def _delete_requested_ids(self):
260260
self.formatters[index].delete_records(deleted_ids)
261261

262262
def _update_completion_table(self) -> None:
263-
# TODO: what about empty sets - do we assume the export gave 0 results or skip it?
264-
# Is there a difference we could notice? (like empty input file vs no file at all)
265-
266263
if not self.completion_tracking_enabled:
267264
return
268265

cumulus_etl/loaders/base.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,15 @@ def __init__(self, root: store.Root):
4646
self.root = root
4747

4848
@abc.abstractmethod
49-
async def load_all(self, resources: list[str]) -> LoaderResults:
49+
async def detect_resources(self) -> set[str] | None:
50+
"""
51+
Inspect which resources are available for use.
52+
53+
:returns: the types of resources detected (or None if that can't be determined yet)
54+
"""
55+
56+
@abc.abstractmethod
57+
async def load_resources(self, resources: set[str]) -> LoaderResults:
5058
"""
5159
Loads the listed remote resources and places them into a local folder as FHIR ndjson
5260

cumulus_etl/loaders/fhir/bulk_export.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class BulkExporter:
3030
def __init__(
3131
self,
3232
client: fhir.FhirClient,
33-
resources: list[str],
33+
resources: set[str],
3434
url: str,
3535
destination: str,
3636
*,
@@ -81,7 +81,7 @@ def format_kickoff_url(
8181
self,
8282
url: str,
8383
*,
84-
resources: list[str],
84+
resources: set[str],
8585
since: str | None,
8686
until: str | None,
8787
prefer_url_resources: bool,

cumulus_etl/loaders/fhir/ndjson_loader.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import tempfile
44

5+
import cumulus_fhir_support
6+
57
from cumulus_etl import cli_utils, common, errors, fhir, store
68
from cumulus_etl.loaders import base
79
from cumulus_etl.loaders.fhir.bulk_export import BulkExporter
@@ -37,7 +39,16 @@ def __init__(
3739
self.until = until
3840
self.resume = resume
3941

40-
async def load_all(self, resources: list[str]) -> base.LoaderResults:
42+
async def detect_resources(self) -> set[str] | None:
43+
if self.root.protocol in {"http", "https"}:
44+
return None # we haven't done the export yet!
45+
46+
found_files = cumulus_fhir_support.list_multiline_json_in_dir(
47+
self.root.path, fsspec_fs=self.root.fs
48+
)
49+
return {resource for resource in found_files.values() if resource}
50+
51+
async def load_resources(self, resources: set[str]) -> base.LoaderResults:
4152
# Are we doing a bulk FHIR export from a server?
4253
if self.root.protocol in ["http", "https"]:
4354
bulk_dir = await self.load_from_bulk_export(resources)
@@ -61,14 +72,14 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:
6172
# TemporaryDirectory gets discarded), but that seems reasonable.
6273
print("Copying ndjson input files…")
6374
tmpdir = tempfile.TemporaryDirectory()
64-
filenames = common.ls_resources(input_root, set(resources), warn_if_empty=True)
75+
filenames = common.ls_resources(input_root, resources, warn_if_empty=True)
6576
for filename in filenames:
6677
input_root.get(filename, f"{tmpdir.name}/")
6778

6879
return self.read_loader_results(input_root, tmpdir)
6980

7081
async def load_from_bulk_export(
71-
self, resources: list[str], prefer_url_resources: bool = False
82+
self, resources: set[str], prefer_url_resources: bool = False
7283
) -> common.Directory:
7384
"""
7485
Performs a bulk export and drops the results in an export dir.

cumulus_etl/loaders/i2b2/loader.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,27 @@ def __init__(self, root: store.Root, export_to: str | None = None):
3434
super().__init__(root)
3535
self.export_to = export_to
3636

37-
async def load_all(self, resources: list[str]) -> base.LoaderResults:
37+
async def detect_resources(self) -> set[str] | None:
38+
if self.root.protocol in {"tcp"}:
39+
return None # we haven't done the export yet!
40+
41+
filenames = {
42+
"observation_fact_diagnosis.csv": "Condition",
43+
"observation_fact_lab_views.csv": "Observation",
44+
"observation_fact_medications.csv": "MedicationRequest",
45+
"observation_fact_notes.csv": "DocumentReference",
46+
"observation_fact_vitals.csv": "Observation",
47+
"patient_dimension.csv": "Patient",
48+
"visit_dimension.csv": "Encounter",
49+
}
50+
51+
return {
52+
resource
53+
for path, resource in filenames.items()
54+
if self.root.exists(self.root.joinpath(path))
55+
}
56+
57+
async def load_resources(self, resources: set[str]) -> base.LoaderResults:
3858
if self.root.protocol in ["tcp"]:
3959
directory = self._load_all_from_oracle(resources)
4060
else:
@@ -43,7 +63,7 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:
4363

4464
def _load_all_with_extractors(
4565
self,
46-
resources: list[str],
66+
resources: set[str],
4767
conditions: I2b2ExtractorCallable,
4868
lab_views: I2b2ExtractorCallable,
4969
medicationrequests: I2b2ExtractorCallable,
@@ -139,7 +159,7 @@ def _loop(
139159
#
140160
###################################################################################################################
141161

142-
def _load_all_from_csv(self, resources: list[str]) -> common.Directory:
162+
def _load_all_from_csv(self, resources: set[str]) -> common.Directory:
143163
path = self.root.path
144164
return self._load_all_with_extractors(
145165
resources,
@@ -177,7 +197,7 @@ def _load_all_from_csv(self, resources: list[str]) -> common.Directory:
177197
#
178198
###################################################################################################################
179199

180-
def _load_all_from_oracle(self, resources: list[str]) -> common.Directory:
200+
def _load_all_from_oracle(self, resources: set[str]) -> common.Directory:
181201
path = self.root.path
182202
return self._load_all_with_extractors(
183203
resources,

cumulus_etl/upload_notes/downloader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async def download_docrefs_from_fhir_server(
2727
else:
2828
# else we'll download the entire target path as a bulk export (presumably the user has scoped a Group)
2929
ndjson_loader = loaders.FhirNdjsonLoader(root_input, client, export_to=export_to)
30-
return await ndjson_loader.load_all(["DocumentReference"])
30+
return await ndjson_loader.load_resources({"DocumentReference"})
3131

3232

3333
async def _download_docrefs_from_fake_ids(

0 commit comments

Comments
 (0)