| 
 | 1 | +"""  | 
 | 2 | +Initializes basic resource tables.  | 
 | 3 | +
  | 
 | 4 | +Creates the tables if they don't exist and pushes up a basic schema.  | 
 | 5 | +"""  | 
 | 6 | + | 
 | 7 | +import argparse  | 
 | 8 | +from collections.abc import Iterable  | 
 | 9 | + | 
 | 10 | +from cumulus_etl import cli_utils, formats, store  | 
 | 11 | +from cumulus_etl.etl import tasks  | 
 | 12 | +from cumulus_etl.etl.tasks import task_factory  | 
 | 13 | + | 
 | 14 | + | 
 | 15 | +def define_init_parser(parser: argparse.ArgumentParser) -> None:  | 
 | 16 | +    parser.usage = "%(prog)s [OPTION]... OUTPUT"  | 
 | 17 | +    parser.description = (  | 
 | 18 | +        "Initialize all basic output tables. "  | 
 | 19 | +        "After this command is run, you will be ready to set up Cumulus Library. "  | 
 | 20 | +        "This command is safe to run multiple times on the same folder, "  | 
 | 21 | +        "or even on an existing folder with data already in it."  | 
 | 22 | +    )  | 
 | 23 | + | 
 | 24 | +    parser.add_argument("dir_output", metavar="/path/to/output")  | 
 | 25 | +    cli_utils.add_output_format(parser)  | 
 | 26 | + | 
 | 27 | +    cli_utils.add_aws(parser)  | 
 | 28 | + | 
 | 29 | + | 
 | 30 | +def get_task_tables() -> Iterable[tuple[type[tasks.EtlTask], tasks.OutputTable]]:  | 
 | 31 | +    for task_class in task_factory.get_default_tasks():  | 
 | 32 | +        for output in task_class.outputs:  | 
 | 33 | +            if not output.get_name(task_class).startswith("etl__"):  | 
 | 34 | +                yield task_class, output  | 
 | 35 | + | 
 | 36 | + | 
 | 37 | +async def init_main(args: argparse.Namespace) -> None:  | 
 | 38 | +    """Main logic for initialization"""  | 
 | 39 | +    # record filesystem options like --s3-region before creating Roots  | 
 | 40 | +    store.set_user_fs_options(vars(args))  | 
 | 41 | + | 
 | 42 | +    output_root = store.Root(args.dir_output)  | 
 | 43 | + | 
 | 44 | +    with cli_utils.make_progress_bar() as progress:  | 
 | 45 | +        # Set up progress bar  | 
 | 46 | +        total_steps = len(list(get_task_tables())) + 1  # extra 1 is initializing the formatter  | 
 | 47 | +        task = progress.add_task("Initializing tables", total=total_steps)  | 
 | 48 | + | 
 | 49 | +        # Initialize formatter (which can take a moment with deltalake)  | 
 | 50 | +        format_class = formats.get_format_class(args.output_format)  | 
 | 51 | +        format_class.initialize_class(output_root)  | 
 | 52 | +        progress.update(task, advance=1)  | 
 | 53 | + | 
 | 54 | +        # Create an empty JobConfig/ folder, so that the 'convert' command will recognize this  | 
 | 55 | +        # folder as an ETL folder.  | 
 | 56 | +        output_root.makedirs(output_root.joinpath("JobConfig"))  | 
 | 57 | + | 
 | 58 | +        # Now iterate through, pushing to each output table  | 
 | 59 | +        for task_class, output in get_task_tables():  | 
 | 60 | +            batch = task_class.make_batch_from_rows(output.get_resource_type(task_class), [])  | 
 | 61 | +            formatter = format_class(output_root, output.get_name(task_class))  | 
 | 62 | +            formatter.write_records(batch)  | 
 | 63 | +            progress.update(task, advance=1)  | 
 | 64 | + | 
 | 65 | + | 
 | 66 | +async def run_init(parser: argparse.ArgumentParser, argv: list[str]) -> None:  | 
 | 67 | +    """Parse arguments and do the work"""  | 
 | 68 | +    define_init_parser(parser)  | 
 | 69 | +    args = parser.parse_args(argv)  | 
 | 70 | +    await init_main(args)  | 
0 commit comments