Skip to content

Commit ab6caa3

Browse files
authored
Merge pull request #353 from smart-on-fhir/mikix/extra-completion-info
Add ETL timestamp to completion table, make it non-unique
2 parents 98e2b8c + 5a43957 commit ab6caa3

35 files changed

+56
-37
lines changed

cumulus_etl/completion/schema.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ def completion_format_args() -> dict:
1313
"""Returns kwargs to pass to the Format class initializer of your choice"""
1414
return {
1515
"dbname": COMPLETION_TABLE,
16-
"uniqueness_fields": {"table_name", "group_name"},
16+
# These fields altogether basically guarantee that we never collide.
17+
# (i.e. that every 'merge' is really an 'insert')
18+
# That's intentional - we want this table to be a bit of a historical log.
19+
# (We couldn't have no uniqueness fields -- delta lake doesn't like that.)
20+
"uniqueness_fields": {"table_name", "group_name", "export_time", "etl_time"},
1721
}
1822

1923

@@ -47,6 +51,8 @@ def completion_schema() -> pyarrow.Schema:
4751
pyarrow.field("export_time", pyarrow.string()),
4852
pyarrow.field("export_url", pyarrow.string()),
4953
pyarrow.field("etl_version", pyarrow.string()),
54+
# See note above for why this isn't a pyarrow.timestamp() field.
55+
pyarrow.field("etl_time", pyarrow.string()),
5056
]
5157
)
5258

cumulus_etl/etl/config.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def __init__(
4444
self._output_format = output_format
4545
self.dir_errors = dir_errors
4646
self.client = client
47-
self.timestamp = common.timestamp_filename(timestamp)
47+
self.timestamp = timestamp
4848
self.hostname = gethostname()
4949
self.comment = comment or ""
5050
self.batch_size = batch_size
@@ -67,7 +67,8 @@ def path_config(self) -> str:
6767
return os.path.join(self.dir_job_config(), "job_config.json")
6868

6969
def dir_job_config(self) -> str:
70-
path = self._output_root.joinpath(f"JobConfig/{self.timestamp}")
70+
timestamp_dir = common.timestamp_filename(self.timestamp)
71+
path = self._output_root.joinpath(f"JobConfig/{timestamp_dir}")
7172
self._output_root.makedirs(path)
7273
return path
7374

cumulus_etl/etl/tasks/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ def _update_completion_table(self) -> None:
272272
"export_time": self.task_config.export_datetime.isoformat(),
273273
"export_url": self.task_config.export_url,
274274
"etl_version": cumulus_etl.__version__,
275+
"etl_time": self.task_config.timestamp.isoformat(),
275276
}
276277
for output in self.outputs
277278
if not output.get_name(self).startswith("etl__")
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"table_name": "covid_symptom__nlp_results", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
1+
{"table_name": "covid_symptom__nlp_results", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"table_name": "covid_symptom__nlp_results_term_exists", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
1+
{"table_name": "covid_symptom__nlp_results_term_exists", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"table_name": "hftest__summary", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
1+
{"table_name": "hftest__summary", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
1+
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
1+
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
1+
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
1+
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}

0 commit comments

Comments
 (0)