Skip to content

Commit 4089477

Browse files
committed
Add export URL & ETL version into completion table
This should help track how we exported and processed the data.
1 parent 6c62a53 commit 4089477

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+106
-56
lines changed

cumulus_etl/completion/schema.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def completion_schema() -> pyarrow.Schema:
4545
# datetime) would then require conversion to and fro, it's easier to
4646
# just mirror our FHIR tables and use strings for timestamps.
4747
pyarrow.field("export_time", pyarrow.string()),
48+
pyarrow.field("export_url", pyarrow.string()),
49+
pyarrow.field("etl_version", pyarrow.string()),
4850
]
4951
)
5052

cumulus_etl/etl/cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ async def etl_main(args: argparse.Namespace) -> None:
296296
tasks=[t.name for t in selected_tasks],
297297
export_group_name=export_group_name,
298298
export_datetime=export_datetime,
299+
export_url=loader_results.export_url,
299300
deleted_ids=loader_results.deleted_ids,
300301
)
301302
common.write_json(config.path_config(), config.as_json(), indent=4)

cumulus_etl/etl/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(
3333
tasks: list[str] | None = None,
3434
export_group_name: str | None = None,
3535
export_datetime: datetime.datetime | None = None,
36+
export_url: str | None = None,
3637
deleted_ids: dict[str, set[str]] | None = None,
3738
):
3839
self._dir_input_orig = dir_input_orig
@@ -51,6 +52,7 @@ def __init__(
5152
self.tasks = tasks or []
5253
self.export_group_name = export_group_name
5354
self.export_datetime = export_datetime
55+
self.export_url = export_url
5456
self.deleted_ids = deleted_ids or {}
5557

5658
# initialize format class
@@ -82,6 +84,7 @@ def as_json(self):
8284
"tasks": ",".join(self.tasks),
8385
"export_group_name": self.export_group_name,
8486
"export_timestamp": self.export_datetime and self.export_datetime.isoformat(),
87+
"export_url": self.export_url,
8588
}
8689

8790

cumulus_etl/etl/tasks/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import rich.table
1414
import rich.text
1515

16+
import cumulus_etl
1617
from cumulus_etl import cli_utils, common, completion, deid, formats, store
1718
from cumulus_etl.etl import config
1819
from cumulus_etl.etl.tasks import batching
@@ -272,6 +273,8 @@ def _update_completion_table(self) -> None:
272273
"table_name": output.get_name(self),
273274
"group_name": self.task_config.export_group_name,
274275
"export_time": self.task_config.export_datetime.isoformat(),
276+
"export_url": self.task_config.export_url,
277+
"etl_version": cumulus_etl.__version__,
275278
}
276279
for output in self.outputs
277280
if not output.get_name(self).startswith("etl__")

cumulus_etl/loaders/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ def path(self) -> str:
2222
# and the time when it was exported ("transactionTime" in bulk-export terms).
2323
group_name: str | None = None
2424
export_datetime: datetime.datetime | None = None
25+
export_url: str | None = None
2526

2627
# A list of resource IDs that should be deleted from the output tables.
2728
# This is a map of resource -> set of IDs like {"Patient": {"A", "B"}}

cumulus_etl/loaders/fhir/bulk_export.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def __init__(
7575
# Public properties, to be read after the export:
7676
self.export_datetime = None
7777
self.group_name = fhir.parse_group_from_url(self._url)
78+
self.export_url = self._url
7879

7980
def format_kickoff_url(
8081
self,

cumulus_etl/loaders/fhir/export_log.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class NoLogs(LogParsingError):
4141
def __init__(self, root: store.Root):
4242
self.group_name: str = None
4343
self.export_datetime: datetime.datetime = None
44+
self.export_url: str = None
4445

4546
self._parse(root, self._find(root))
4647

@@ -67,6 +68,7 @@ def _parse(self, root: store.Root, path: str) -> None:
6768
def _parse_kickoff(self, row: dict) -> None:
6869
details = row["eventDetail"]
6970
self.group_name = fhir.parse_group_from_url(details["exportUrl"])
71+
self.export_url = details["exportUrl"]
7072

7173
def _parse_status_complete(self, row: dict) -> None:
7274
details = row["eventDetail"]

cumulus_etl/loaders/fhir/ndjson_loader.py

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,30 +40,16 @@ def __init__(
4040
async def load_all(self, resources: list[str]) -> base.LoaderResults:
4141
# Are we doing a bulk FHIR export from a server?
4242
if self.root.protocol in ["http", "https"]:
43-
results = await self.load_from_bulk_export(resources)
44-
input_root = store.Root(results.path)
43+
bulk_dir = await self.load_from_bulk_export(resources)
44+
input_root = store.Root(bulk_dir.name)
4545
else:
4646
if self.export_to or self.since or self.until or self.resume:
4747
errors.fatal(
4848
"You provided FHIR bulk export parameters but did not provide a FHIR server",
4949
errors.ARGS_CONFLICT,
5050
)
51-
52-
results = base.LoaderResults(directory=self.root.path)
5351
input_root = self.root
5452

55-
# Parse logs for export information
56-
try:
57-
parser = BulkExportLogParser(input_root)
58-
results.group_name = parser.group_name
59-
results.export_datetime = parser.export_datetime
60-
except BulkExportLogParser.LogParsingError:
61-
# Once we require group name & export datetime, we should warn about this.
62-
# For now, just ignore any errors.
63-
pass
64-
65-
results.deleted_ids = self.read_deleted_ids(input_root)
66-
6753
# Copy the resources we need from the remote directory (like S3 buckets) to a local one.
6854
#
6955
# We do this even if the files are local, because the next step in our pipeline is the MS deid tool,
@@ -78,13 +64,12 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:
7864
filenames = common.ls_resources(input_root, set(resources), warn_if_empty=True)
7965
for filename in filenames:
8066
input_root.get(filename, f"{tmpdir.name}/")
81-
results.directory = tmpdir
8267

83-
return results
68+
return self.read_loader_results(input_root, tmpdir)
8469

8570
async def load_from_bulk_export(
8671
self, resources: list[str], prefer_url_resources: bool = False
87-
) -> base.LoaderResults:
72+
) -> common.Directory:
8873
"""
8974
Performs a bulk export and drops the results in an export dir.
9075
@@ -109,12 +94,29 @@ async def load_from_bulk_export(
10994
except errors.FatalError as exc:
11095
errors.fatal(str(exc), errors.BULK_EXPORT_FAILED)
11196

112-
return base.LoaderResults(
113-
directory=target_dir,
114-
group_name=bulk_exporter.group_name,
115-
export_datetime=bulk_exporter.export_datetime,
97+
return target_dir
98+
99+
def read_loader_results(
100+
self, input_root: store.Root, results_dir: common.Directory
101+
) -> base.LoaderResults:
102+
results = base.LoaderResults(
103+
directory=results_dir,
104+
deleted_ids=self.read_deleted_ids(input_root),
116105
)
117106

107+
# Parse logs for export information
108+
try:
109+
parser = BulkExportLogParser(input_root)
110+
results.group_name = parser.group_name
111+
results.export_datetime = parser.export_datetime
112+
results.export_url = parser.export_url
113+
except BulkExportLogParser.LogParsingError:
114+
# Once we require group name & export datetime, we should warn about this.
115+
# For now, just ignore any errors.
116+
pass
117+
118+
return results
119+
118120
def read_deleted_ids(self, root: store.Root) -> dict[str, set[str]]:
119121
"""
120122
Reads any deleted IDs that a bulk export gave us.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"table_name": "covid_symptom__nlp_results", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
1+
{"table_name": "covid_symptom__nlp_results", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"table_name": "covid_symptom__nlp_results_term_exists", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
1+
{"table_name": "covid_symptom__nlp_results_term_exists", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}

0 commit comments

Comments
 (0)