55"""
66
77import argparse
8- from collections .abc import Iterable
8+ from collections .abc import Generator
99
10- from cumulus_etl import cli_utils , formats , store
11- from cumulus_etl .etl import tasks
10+ import pyarrow
11+
12+ from cumulus_etl import cli_utils , completion , formats , store
1213from cumulus_etl .etl .tasks import task_factory
1314
1415
@@ -27,11 +28,21 @@ def define_init_parser(parser: argparse.ArgumentParser) -> None:
2728 cli_utils .add_aws (parser )
2829
2930
30- def get_task_tables () -> Iterable [tuple [type [tasks .EtlTask ], tasks .OutputTable ]]:
31+ def get_task_tables () -> Generator [tuple [dict , pyarrow .Schema ]]:
32+ """Returns (formatter kwargs, table_schema)"""
3133 for task_class in task_factory .get_default_tasks ():
3234 for output in task_class .outputs :
33- if not output .get_name (task_class ).startswith ("etl__" ):
34- yield task_class , output
35+ kwargs = {
36+ "dbname" : output .get_name (task_class ),
37+ "group_field" : output .group_field ,
38+ "uniqueness_fields" : output .uniqueness_fields ,
39+ "update_existing" : output .update_existing ,
40+ }
41+ schema = task_class .get_schema (output .get_resource_type (task_class ), [])
42+ yield kwargs , schema
43+
44+ # Add the general ETL completion table, just to be nice to any possible consumers
45+ yield completion .completion_format_args (), completion .completion_schema ()
3546
3647
3748async def init_main (args : argparse .Namespace ) -> None :
@@ -56,10 +67,9 @@ async def init_main(args: argparse.Namespace) -> None:
5667 output_root .makedirs (output_root .joinpath ("JobConfig" ))
5768
5869 # Now iterate through, pushing to each output table
59- for task_class , output in get_task_tables ():
60- batch = task_class .make_batch_from_rows (output .get_resource_type (task_class ), [])
61- formatter = format_class (output_root , output .get_name (task_class ))
62- formatter .write_records (batch )
70+ for format_kwargs , schema in get_task_tables ():
71+ formatter = format_class (output_root , ** format_kwargs )
72+ formatter .write_records (formats .Batch ([], schema = schema ))
6373 progress .update (task , advance = 1 )
6474
6575
0 commit comments