From 453df68937332264910392df24d281d4bc3b96a0 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Wed, 22 Oct 2025 10:44:10 -0400 Subject: [PATCH] init: create skeleton completion tables too (in addition to normal resource tables) The kidney study was expecting the encounter completion table. Might as well help it along. --- cumulus_etl/etl/init/cli.py | 30 ++++++++++++++++++++---------- tests/init/test_init_cli.py | 2 ++ 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/cumulus_etl/etl/init/cli.py b/cumulus_etl/etl/init/cli.py index 705916a1..65f12461 100644 --- a/cumulus_etl/etl/init/cli.py +++ b/cumulus_etl/etl/init/cli.py @@ -5,10 +5,11 @@ """ import argparse -from collections.abc import Iterable +from collections.abc import Generator -from cumulus_etl import cli_utils, formats, store -from cumulus_etl.etl import tasks +import pyarrow + +from cumulus_etl import cli_utils, completion, formats, store from cumulus_etl.etl.tasks import task_factory @@ -27,11 +28,21 @@ def define_init_parser(parser: argparse.ArgumentParser) -> None: cli_utils.add_aws(parser) -def get_task_tables() -> Iterable[tuple[type[tasks.EtlTask], tasks.OutputTable]]: +def get_task_tables() -> Generator[tuple[dict, pyarrow.Schema]]: + """Returns (formatter kwargs, table_schema)""" for task_class in task_factory.get_default_tasks(): for output in task_class.outputs: - if not output.get_name(task_class).startswith("etl__"): - yield task_class, output + kwargs = { + "dbname": output.get_name(task_class), + "group_field": output.group_field, + "uniqueness_fields": output.uniqueness_fields, + "update_existing": output.update_existing, + } + schema = task_class.get_schema(output.get_resource_type(task_class), []) + yield kwargs, schema + + # Add the general ETL completion table, just to be nice to any possible consumers + yield completion.completion_format_args(), completion.completion_schema() async def init_main(args: argparse.Namespace) -> None: @@ -56,10 +67,9 @@ async def init_main(args: argparse.Namespace) -> None: output_root.makedirs(output_root.joinpath("JobConfig")) # Now iterate through, pushing to each output table - for task_class, output in get_task_tables(): - batch = task_class.make_batch_from_rows(output.get_resource_type(task_class), []) - formatter = format_class(output_root, output.get_name(task_class)) - formatter.write_records(batch) + for format_kwargs, schema in get_task_tables(): + formatter = format_class(output_root, **format_kwargs) + formatter.write_records(formats.Batch([], schema=schema)) progress.update(task, advance=1) diff --git a/tests/init/test_init_cli.py b/tests/init/test_init_cli.py index f7657694..f48cdfe3 100644 --- a/tests/init/test_init_cli.py +++ b/tests/init/test_init_cli.py @@ -34,6 +34,8 @@ async def test_happy_path(self): self.assertIn("patient", dirs) self.assertIn("medicationrequest", dirs) self.assertIn("medication", dirs) # secondary table + self.assertIn("etl__completion_encounters", dirs) # custom ETL table + self.assertIn("etl__completion", dirs) # another custom ETL table self.assertIn("JobConfig", dirs) # so that the dir is flagged as an ETL dir by 'convert' # Are folder contents what we expect?