diff --git a/src/vcspull/cli/__init__.py b/src/vcspull/cli/__init__.py index a4d2d303..bbacdd82 100644 --- a/src/vcspull/cli/__init__.py +++ b/src/vcspull/cli/__init__.py @@ -4,6 +4,7 @@ import argparse import logging +import pathlib import textwrap import typing as t from typing import overload @@ -13,6 +14,8 @@ from vcspull.__about__ import __version__ from vcspull.log import setup_logger +from .add import add_repo, create_add_subparser +from .add_from_fs import add_from_filesystem, create_add_from_fs_subparser from .sync import create_sync_subparser, sync log = logging.getLogger(__name__) @@ -73,14 +76,33 @@ def create_parser( ) create_sync_subparser(sync_parser) + add_parser = subparsers.add_parser( + "add", + help="add a repository to the configuration", + formatter_class=argparse.RawDescriptionHelpFormatter, + description="Add a repository to the vcspull configuration file.", + ) + create_add_subparser(add_parser) + + add_from_fs_parser = subparsers.add_parser( + "add-from-fs", + help="scan filesystem for git repositories and add them to the configuration", + formatter_class=argparse.RawDescriptionHelpFormatter, + description="Scan a directory for git repositories and add them to the " + "vcspull configuration file.", + ) + create_add_from_fs_subparser(add_from_fs_parser) + if return_subparsers: - return parser, sync_parser + # Return all parsers needed by cli() function + return parser, (sync_parser, add_parser, add_from_fs_parser) return parser def cli(_args: list[str] | None = None) -> None: """CLI entry point for vcspull.""" - parser, sync_parser = create_parser(return_subparsers=True) + parser, subparsers = create_parser(return_subparsers=True) + sync_parser, _add_parser, _add_from_fs_parser = subparsers args = parser.parse_args(_args) setup_logger(log=log, level=args.log_level.upper()) @@ -90,8 +112,34 @@ def cli(_args: list[str] | None = None) -> None: return if args.subparser_name == "sync": sync( - repo_patterns=args.repo_patterns, - config=args.config, - exit_on_error=args.exit_on_error, + repo_patterns=args.repo_patterns if hasattr(args, "repo_patterns") else [], + config=( + pathlib.Path(args.config) + if hasattr(args, "config") and args.config + else None + ), + exit_on_error=args.exit_on_error + if hasattr(args, "exit_on_error") + else False, parser=sync_parser, ) + elif args.subparser_name == "add": + add_repo_kwargs = { + "name": args.name, + "url": args.url, + "config_file_path_str": args.config if hasattr(args, "config") else None, + "path": args.path if hasattr(args, "path") else None, + "base_dir": args.base_dir if hasattr(args, "base_dir") else None, + } + add_repo(**add_repo_kwargs) + elif args.subparser_name == "add-from-fs": + add_from_fs_kwargs = { + "scan_dir_str": args.scan_dir, + "config_file_path_str": args.config if hasattr(args, "config") else None, + "recursive": args.recursive if hasattr(args, "recursive") else False, + "base_dir_key_arg": args.base_dir_key + if hasattr(args, "base_dir_key") + else None, + "yes": args.yes if hasattr(args, "yes") else False, + } + add_from_filesystem(**add_from_fs_kwargs) diff --git a/src/vcspull/cli/add.py b/src/vcspull/cli/add.py new file mode 100644 index 00000000..d23dc03e --- /dev/null +++ b/src/vcspull/cli/add.py @@ -0,0 +1,183 @@ +"""Add repository functionality for vcspull.""" + +from __future__ import annotations + +import logging +import pathlib +import typing as t + +import yaml +from colorama import Fore, Style + +from vcspull.config import find_home_config_files, save_config_yaml + +if t.TYPE_CHECKING: + import argparse + +log = logging.getLogger(__name__) + + +def create_add_subparser(parser: argparse.ArgumentParser) -> None: + """Create ``vcspull add`` argument subparser.""" + parser.add_argument( + "-c", + "--config", + dest="config", + metavar="file", + help="path to custom config file (default: .vcspull.yaml or ~/.vcspull.yaml)", + ) + parser.add_argument( + "name", + help="Name for the repository in the config", + ) + parser.add_argument( + "url", + help="Repository URL (e.g., https://github.com/user/repo.git)", + ) + parser.add_argument( + "--path", + dest="path", + help="Local directory path where repo will be cloned " + "(determines base directory key if not specified with --dir)", + ) + parser.add_argument( + "--dir", + dest="base_dir", + help="Base directory key in config (e.g., '~/projects/'). " + "If not specified, will be inferred from --path or use current directory.", + ) + + +def add_repo( + name: str, + url: str, + config_file_path_str: str | None, + path: str | None, + base_dir: str | None, +) -> None: + """Add a repository to the vcspull configuration. + + Parameters + ---------- + name : str + Repository name for the config + url : str + Repository URL + config_file_path_str : str | None + Path to config file, or None to use default + path : str | None + Local path where repo will be cloned + base_dir : str | None + Base directory key to use in config + """ + # Determine config file + config_file_path: pathlib.Path + if config_file_path_str: + config_file_path = pathlib.Path(config_file_path_str).expanduser().resolve() + else: + home_configs = find_home_config_files(filetype=["yaml"]) + if not home_configs: + config_file_path = pathlib.Path.cwd() / ".vcspull.yaml" + log.info( + f"No config specified and no default found, will create at " + f"{config_file_path}", + ) + elif len(home_configs) > 1: + log.error( + "Multiple home config files found, please specify one with -c/--config", + ) + return + else: + config_file_path = home_configs[0] + + # Load existing config + raw_config: dict[str, t.Any] = {} + if config_file_path.exists() and config_file_path.is_file(): + try: + with config_file_path.open(encoding="utf-8") as f: + raw_config = yaml.safe_load(f) or {} + if not isinstance(raw_config, dict): + log.error( + f"Config file {config_file_path} is not a valid YAML dictionary. " + "Aborting.", + ) + return + except Exception: + log.exception(f"Error loading YAML from {config_file_path}. Aborting.") + if log.isEnabledFor(logging.DEBUG): + import traceback + + traceback.print_exc() + return + else: + log.info( + f"Config file {config_file_path} not found. A new one will be created.", + ) + + # Determine base directory key + if base_dir: + # Use explicit base directory + base_dir_key = base_dir if base_dir.endswith("/") else base_dir + "/" + elif path: + # Infer from provided path + repo_path = pathlib.Path(path).expanduser().resolve() + try: + # Try to make it relative to home + base_dir_key = "~/" + str(repo_path.relative_to(pathlib.Path.home())) + "/" + except ValueError: + # Use absolute path + base_dir_key = str(repo_path) + "/" + else: + # Default to current directory + base_dir_key = "./" + + # Ensure base directory key exists in config + if base_dir_key not in raw_config: + raw_config[base_dir_key] = {} + elif not isinstance(raw_config[base_dir_key], dict): + log.error( + f"Configuration section '{base_dir_key}' is not a dictionary. Aborting.", + ) + return + + # Check if repo already exists + if name in raw_config[base_dir_key]: + existing_config = raw_config[base_dir_key][name] + # Handle both string and dict formats + current_url: str + if isinstance(existing_config, str): + current_url = existing_config + elif isinstance(existing_config, dict): + repo_value = existing_config.get("repo") + url_value = existing_config.get("url") + current_url = repo_value or url_value or "unknown" + else: + current_url = str(existing_config) + + log.warning( + f"Repository '{name}' already exists under '{base_dir_key}'. " + f"Current URL: {current_url}. " + f"To update, remove and re-add, or edit the YAML file manually.", + ) + return + + # Add the repository in verbose format + raw_config[base_dir_key][name] = {"repo": url} + + # Save config + try: + save_config_yaml(config_file_path, raw_config) + log.info( + f"{Fore.GREEN}✓{Style.RESET_ALL} Successfully added " + f"{Fore.CYAN}'{name}'{Style.RESET_ALL} " + f"({Fore.YELLOW}{url}{Style.RESET_ALL}) to " + f"{Fore.BLUE}{config_file_path}{Style.RESET_ALL} under " + f"'{Fore.MAGENTA}{base_dir_key}{Style.RESET_ALL}'.", + ) + except Exception: + log.exception(f"Error saving config to {config_file_path}") + if log.isEnabledFor(logging.DEBUG): + import traceback + + traceback.print_exc() + raise diff --git a/src/vcspull/cli/add_from_fs.py b/src/vcspull/cli/add_from_fs.py new file mode 100644 index 00000000..4325cac5 --- /dev/null +++ b/src/vcspull/cli/add_from_fs.py @@ -0,0 +1,331 @@ +"""Filesystem scanning functionality for vcspull.""" + +from __future__ import annotations + +import logging +import os +import pathlib +import subprocess +import typing as t + +import yaml +from colorama import Fore, Style + +from vcspull.config import expand_dir, find_home_config_files, save_config_yaml + +if t.TYPE_CHECKING: + import argparse + +log = logging.getLogger(__name__) + + +def get_git_origin_url(repo_path: pathlib.Path) -> str | None: + """Get the origin URL from a git repository. + + Parameters + ---------- + repo_path : pathlib.Path + Path to the git repository + + Returns + ------- + str | None + The origin URL if found, None otherwise + """ + try: + result = subprocess.run( + ["git", "config", "--get", "remote.origin.url"], + cwd=repo_path, + capture_output=True, + text=True, + check=True, + ) + return result.stdout.strip() + except (subprocess.CalledProcessError, FileNotFoundError) as e: + log.debug(f"Could not get origin URL for {repo_path}: {e}") + return None + + +def create_add_from_fs_subparser(parser: argparse.ArgumentParser) -> None: + """Create ``vcspull add-from-fs`` argument subparser.""" + parser.add_argument( + "-c", + "--config", + dest="config", + metavar="file", + help="path to custom config file (default: .vcspull.yaml or ~/.vcspull.yaml)", + ) + parser.add_argument( + "scan_dir", + nargs="?", + default=".", + help="Directory to scan for git repositories (default: current directory)", + ) + parser.add_argument( + "--recursive", + "-r", + action="store_true", + help="Scan directories recursively.", + ) + parser.add_argument( + "--base-dir-key", + help="Specify the top-level directory key from vcspull config " + "(e.g., '~/study/python/') under which to add these repos. " + "If not given, the normalized absolute path of scan_dir will be used as " + "the key.", + ) + parser.add_argument( + "--yes", + "-y", + action="store_true", + help="Automatically confirm additions without prompting.", + ) + + +def add_from_filesystem( + scan_dir_str: str, + config_file_path_str: str | None, + recursive: bool, + base_dir_key_arg: str | None, + yes: bool, +) -> None: + """Scan filesystem for git repositories and add to vcspull config. + + Parameters + ---------- + scan_dir_str : str + Directory to scan for git repositories + config_file_path_str : str | None + Path to config file, or None to use default + recursive : bool + Whether to scan subdirectories recursively + base_dir_key_arg : str | None + Base directory key to use in config (overrides automatic detection) + yes : bool + Whether to skip confirmation prompt + """ + scan_dir = expand_dir(pathlib.Path(scan_dir_str)) + + config_file_path: pathlib.Path + if config_file_path_str: + config_file_path = pathlib.Path(config_file_path_str).expanduser().resolve() + else: + home_configs = find_home_config_files(filetype=["yaml"]) + if not home_configs: + config_file_path = pathlib.Path.cwd() / ".vcspull.yaml" + log.info( + f"{Fore.CYAN}i{Style.RESET_ALL} No config specified and no default " + f"home config, will use/create " + f"{Fore.BLUE}{config_file_path}{Style.RESET_ALL}", + ) + elif len(home_configs) > 1: + log.error( + "Multiple home_config files found, please specify one with -c/--config", + ) + return + else: + config_file_path = home_configs[0] + + raw_config: dict[str, t.Any] = {} + if config_file_path.exists() and config_file_path.is_file(): + try: + with config_file_path.open(encoding="utf-8") as f: + raw_config = yaml.safe_load(f) or {} + if not isinstance(raw_config, dict): + log.error( + f"Config file {config_file_path} is not a valid YAML dictionary. " + "Aborting.", + ) + return + except Exception: + log.exception(f"Error loading YAML from {config_file_path}. Aborting.") + if log.isEnabledFor(logging.DEBUG): + import traceback + + traceback.print_exc() + return + else: + log.info( + f"{Fore.CYAN}i{Style.RESET_ALL} Config file " + f"{Fore.BLUE}{config_file_path}{Style.RESET_ALL} " + f"not found. A new one will be created.", + ) + + found_repos: list[ + tuple[str, str, str] + ] = [] # (repo_name, repo_url, determined_base_key) + + if recursive: + for root, dirs, _ in os.walk(scan_dir): + if ".git" in dirs: + repo_path = pathlib.Path(root) + repo_name = repo_path.name + repo_url = get_git_origin_url(repo_path) + + if not repo_url: + log.warning( + f"Could not determine remote URL for git repository at " + f"{repo_path}. Skipping.", + ) + continue + + determined_base_key: str + if base_dir_key_arg: + determined_base_key = ( + base_dir_key_arg + if base_dir_key_arg.endswith("/") + else base_dir_key_arg + "/" + ) + else: + try: + determined_base_key = ( + "~/" + str(scan_dir.relative_to(pathlib.Path.home())) + "/" + ) + except ValueError: + determined_base_key = str(scan_dir.resolve()) + "/" + + if not determined_base_key.endswith("/"): + determined_base_key += "/" + + found_repos.append((repo_name, repo_url, determined_base_key)) + else: + # Non-recursive: only check immediate subdirectories + for item in scan_dir.iterdir(): + if item.is_dir() and (item / ".git").is_dir(): + repo_name = item.name + repo_url = get_git_origin_url(item) + + if not repo_url: + log.warning( + f"Could not determine remote URL for git repository at " + f"{item}. Skipping.", + ) + continue + + if base_dir_key_arg: + determined_base_key = ( + base_dir_key_arg + if base_dir_key_arg.endswith("/") + else base_dir_key_arg + "/" + ) + else: + try: + determined_base_key = ( + "~/" + str(scan_dir.relative_to(pathlib.Path.home())) + "/" + ) + except ValueError: + determined_base_key = str(scan_dir.resolve()) + "/" + + if not determined_base_key.endswith("/"): + determined_base_key += "/" + + found_repos.append((repo_name, repo_url, determined_base_key)) + + if not found_repos: + log.info( + f"{Fore.YELLOW}!{Style.RESET_ALL} No git repositories found in " + f"{Fore.BLUE}{scan_dir}{Style.RESET_ALL}. Nothing to add.", + ) + return + + repos_to_add: list[tuple[str, str, str]] = [] + existing_repos: list[tuple[str, str, str]] = [] # (name, url, key) + + for name, url, key in found_repos: + target_section = raw_config.get(key, {}) + if isinstance(target_section, dict) and name in target_section: + existing_repos.append((name, url, key)) + else: + repos_to_add.append((name, url, key)) + + if existing_repos: + # Show summary only when there are many existing repos + if len(existing_repos) > 5: + log.info( + f"{Fore.YELLOW}!{Style.RESET_ALL} Found " + f"{Fore.CYAN}{len(existing_repos)}{Style.RESET_ALL} " + f"existing repositories already in configuration.", + ) + else: + # Show details only for small numbers + log.info( + f"{Fore.YELLOW}!{Style.RESET_ALL} Found " + f"{Fore.CYAN}{len(existing_repos)}{Style.RESET_ALL} " + f"existing repositories in configuration:", + ) + for name, url, key in existing_repos: + log.info( + f" {Fore.BLUE}•{Style.RESET_ALL} " + f"{Fore.CYAN}{name}{Style.RESET_ALL} " + f"({Fore.YELLOW}{url}{Style.RESET_ALL}) at " + f"{Fore.MAGENTA}{key}{name}{Style.RESET_ALL} " + f"in {Fore.BLUE}{config_file_path}{Style.RESET_ALL}", + ) + + if not repos_to_add: + if existing_repos: + log.info( + f"{Fore.GREEN}✓{Style.RESET_ALL} All found repositories already exist " + f"in the configuration. {Fore.GREEN}Nothing to do.{Style.RESET_ALL}", + ) + return + + # Show what will be added + log.info( + f"\n{Fore.GREEN}Found {len(repos_to_add)} new " + f"{'repository' if len(repos_to_add) == 1 else 'repositories'} " + f"to add:{Style.RESET_ALL}", + ) + for repo_name, repo_url, _determined_base_key in repos_to_add: + log.info( + f" {Fore.GREEN}+{Style.RESET_ALL} {Fore.CYAN}{repo_name}{Style.RESET_ALL} " + f"({Fore.YELLOW}{repo_url}{Style.RESET_ALL})", + ) + + if not yes: + confirm = input( + f"\n{Fore.CYAN}Add these repositories? [y/N]: {Style.RESET_ALL}", + ).lower() + if confirm not in {"y", "yes"}: + log.info(f"{Fore.RED}✗{Style.RESET_ALL} Aborted by user.") + return + + changes_made = False + for repo_name, repo_url, determined_base_key in repos_to_add: + if determined_base_key not in raw_config: + raw_config[determined_base_key] = {} + elif not isinstance(raw_config[determined_base_key], dict): + log.warning( + f"Section '{determined_base_key}' in config is not a dictionary. " + f"Skipping repo {repo_name}.", + ) + continue + + if repo_name not in raw_config[determined_base_key]: + raw_config[determined_base_key][repo_name] = {"repo": repo_url} + log.info( + f"{Fore.GREEN}+{Style.RESET_ALL} Adding " + f"{Fore.CYAN}'{repo_name}'{Style.RESET_ALL} " + f"({Fore.YELLOW}{repo_url}{Style.RESET_ALL}) under " + f"'{Fore.MAGENTA}{determined_base_key}{Style.RESET_ALL}'.", + ) + changes_made = True + + if changes_made: + try: + save_config_yaml(config_file_path, raw_config) + log.info( + f"{Fore.GREEN}✓{Style.RESET_ALL} Successfully updated " + f"{Fore.BLUE}{config_file_path}{Style.RESET_ALL}.", + ) + except Exception: + log.exception(f"Error saving config to {config_file_path}") + if log.isEnabledFor(logging.DEBUG): + import traceback + + traceback.print_exc() + raise + else: + log.info( + f"{Fore.GREEN}✓{Style.RESET_ALL} No changes made to the configuration.", + ) diff --git a/src/vcspull/cli/sync.py b/src/vcspull/cli/sync.py index 1f754887..149d24ff 100644 --- a/src/vcspull/cli/sync.py +++ b/src/vcspull/cli/sync.py @@ -66,7 +66,7 @@ def create_sync_subparser(parser: argparse.ArgumentParser) -> argparse.ArgumentP def sync( repo_patterns: list[str], - config: pathlib.Path, + config: pathlib.Path | None, exit_on_error: bool, parser: argparse.ArgumentParser | None = None, # optional so sync can be unit tested diff --git a/src/vcspull/config.py b/src/vcspull/config.py index 79f504ad..4edf198f 100644 --- a/src/vcspull/config.py +++ b/src/vcspull/config.py @@ -424,3 +424,21 @@ def is_config_file( extensions = [".yml", ".yaml", ".json"] extensions = [extensions] if isinstance(extensions, str) else extensions return any(filename.endswith(e) for e in extensions) + + +def save_config_yaml(config_file_path: pathlib.Path, data: dict[t.Any, t.Any]) -> None: + """Save configuration data to a YAML file. + + Parameters + ---------- + config_file_path : pathlib.Path + Path to the configuration file to write + data : dict + Configuration data to save + """ + yaml_content = ConfigReader._dump( + fmt="yaml", + content=data, + indent=2, + ) + config_file_path.write_text(yaml_content, encoding="utf-8") diff --git a/src/vcspull/log.py b/src/vcspull/log.py index 10e671f7..d5aee2cc 100644 --- a/src/vcspull/log.py +++ b/src/vcspull/log.py @@ -38,19 +38,41 @@ def setup_logger( if not log: log = logging.getLogger() if not log.handlers: - channel = logging.StreamHandler() - channel.setFormatter(DebugLogFormatter()) + # Setup root vcspull logger with debug formatter + vcspull_logger = logging.getLogger("vcspull") + if not vcspull_logger.handlers: + channel = logging.StreamHandler() + channel.setFormatter(DebugLogFormatter()) + vcspull_logger.setLevel(level) + vcspull_logger.addHandler(channel) + vcspull_logger.propagate = False + + # Setup simple formatter specifically for CLI modules + # These modules provide user-facing output that should be clean + cli_loggers = [ + "vcspull.cli.add", + "vcspull.cli.add_from_fs", + "vcspull.cli.sync", + ] - log.setLevel(level) - log.addHandler(channel) + for logger_name in cli_loggers: + cli_logger = logging.getLogger(logger_name) + if not cli_logger.handlers: + cli_channel = logging.StreamHandler() + cli_channel.setFormatter(SimpleLogFormatter()) + cli_logger.setLevel(level) + cli_logger.addHandler(cli_channel) + cli_logger.propagate = False # setup styling for repo loggers repo_logger = logging.getLogger("libvcs") - channel = logging.StreamHandler() - channel.setFormatter(RepoLogFormatter()) - channel.addFilter(RepoFilter()) - repo_logger.setLevel(level) - repo_logger.addHandler(channel) + if not repo_logger.handlers: + repo_channel = logging.StreamHandler() + repo_channel.setFormatter(RepoLogFormatter()) + repo_channel.addFilter(RepoFilter()) + repo_logger.setLevel(level) + repo_logger.addHandler(repo_channel) + repo_logger.propagate = False class LogFormatter(logging.Formatter): @@ -180,6 +202,14 @@ def template(self, record: logging.LogRecord) -> str: return f"{Fore.GREEN + Style.DIM}|{record.bin_name}| {Fore.YELLOW}({record.keyword}) {Fore.RESET}" # type:ignore # noqa: E501 +class SimpleLogFormatter(logging.Formatter): + """Simple formatter that outputs only the message, like print().""" + + def format(self, record: logging.LogRecord) -> str: + """Format log record to just return the message.""" + return record.getMessage() + + class RepoFilter(logging.Filter): """Only include repo logs for this type of record.""" diff --git a/tests/cli/test_add.py b/tests/cli/test_add.py new file mode 100644 index 00000000..dd50073c --- /dev/null +++ b/tests/cli/test_add.py @@ -0,0 +1,154 @@ +"""Tests for vcspull.cli.add functionality.""" + +from __future__ import annotations + +import typing as t + +import yaml + +from vcspull.cli.add import add_repo + +if t.TYPE_CHECKING: + import pathlib + + from _pytest.logging import LogCaptureFixture + + +class TestAddRepo: + """Test add_repo function.""" + + def test_add_simple_repo( + self, + tmp_path: pathlib.Path, + caplog: LogCaptureFixture, + ) -> None: + """Test adding a simple repository.""" + caplog.set_level("INFO") + + config_file = tmp_path / ".vcspull.yaml" + + # Add a repository + add_repo( + name="myproject", + url="git@github.com:user/myproject.git", + config_file_path_str=str(config_file), + path=None, + base_dir=None, + ) + + # Verify config file was created with verbose format + assert config_file.exists() + with config_file.open() as f: + config_data = yaml.safe_load(f) + + # Check verbose format + assert "./" in config_data + assert "myproject" in config_data["./"] + assert config_data["./"]["myproject"] == { + "repo": "git@github.com:user/myproject.git", + } + + # Check success message + assert "Successfully added 'myproject'" in caplog.text + + def test_add_with_custom_base_dir( + self, + tmp_path: pathlib.Path, + caplog: LogCaptureFixture, + ) -> None: + """Test adding a repository with custom base directory.""" + caplog.set_level("INFO") + + config_file = tmp_path / ".vcspull.yaml" + + # Add a repository with custom base dir + add_repo( + name="mylib", + url="https://github.com/org/mylib", + config_file_path_str=str(config_file), + path=None, + base_dir="~/projects/libs", + ) + + # Verify config + with config_file.open() as f: + config_data = yaml.safe_load(f) + + assert "~/projects/libs/" in config_data + assert config_data["~/projects/libs/"]["mylib"] == { + "repo": "https://github.com/org/mylib", + } + + def test_add_duplicate_repo( + self, + tmp_path: pathlib.Path, + caplog: LogCaptureFixture, + ) -> None: + """Test adding a duplicate repository shows warning.""" + caplog.set_level("WARNING") + + config_file = tmp_path / ".vcspull.yaml" + + # Pre-create config with existing repo + existing_config = { + "~/code/": {"existing": {"repo": "git@github.com:user/existing.git"}}, + } + with config_file.open("w") as f: + yaml.dump(existing_config, f) + + # Try to add duplicate + add_repo( + name="existing", + url="git@github.com:other/existing.git", + config_file_path_str=str(config_file), + path=None, + base_dir="~/code", + ) + + # Should show warning + assert "Repository 'existing' already exists" in caplog.text + assert "Current URL: git@github.com:user/existing.git" in caplog.text + + # Config should not be changed + with config_file.open() as f: + config_data = yaml.safe_load(f) + assert ( + config_data["~/code/"]["existing"]["repo"] + == "git@github.com:user/existing.git" + ) + + def test_add_to_existing_config( + self, + tmp_path: pathlib.Path, + caplog: LogCaptureFixture, + ) -> None: + """Test adding to an existing config file.""" + caplog.set_level("INFO") + + config_file = tmp_path / ".vcspull.yaml" + + # Pre-create config with some repos + existing_config = { + "~/work/": {"project1": {"repo": "git@github.com:user/project1.git"}}, + } + with config_file.open("w") as f: + yaml.dump(existing_config, f) + + # Add new repo to same base dir + add_repo( + name="project2", + url="git@github.com:user/project2.git", + config_file_path_str=str(config_file), + path=None, + base_dir="~/work", + ) + + # Verify both repos exist + with config_file.open() as f: + config_data = yaml.safe_load(f) + + assert "project1" in config_data["~/work/"] + assert "project2" in config_data["~/work/"] + assert config_data["~/work/"]["project2"] == { + "repo": "git@github.com:user/project2.git", + } diff --git a/tests/cli/test_add_from_fs.py b/tests/cli/test_add_from_fs.py new file mode 100644 index 00000000..d6e73d6d --- /dev/null +++ b/tests/cli/test_add_from_fs.py @@ -0,0 +1,649 @@ +"""Tests for vcspull.cli.add_from_fs using libvcs fixtures.""" + +from __future__ import annotations + +import subprocess +import typing as t + +import yaml + +from vcspull.cli.add_from_fs import add_from_filesystem, get_git_origin_url +from vcspull.config import save_config_yaml + +if t.TYPE_CHECKING: + import pathlib + + import pytest + from _pytest.logging import LogCaptureFixture + from libvcs.pytest_plugin import CreateRepoPytestFixtureFn + + +class TestGetGitOriginUrl: + """Test get_git_origin_url function with real git repos.""" + + def test_success( + self, + create_git_remote_repo: CreateRepoPytestFixtureFn, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + ) -> None: + """Test successfully getting origin URL from a git repository.""" + # Create a remote repository + remote_path = create_git_remote_repo() + remote_url = f"file://{remote_path}" + + # Clone it + local_repo_path = tmp_path / "test_repo" + subprocess.run( + ["git", "clone", remote_url, str(local_repo_path)], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + + # Test getting origin URL + url = get_git_origin_url(local_repo_path) + assert url == remote_url + + def test_no_remote( + self, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + caplog: LogCaptureFixture, + ) -> None: + """Test handling repository with no origin remote.""" + # Create a local git repo without remote + repo_path = tmp_path / "local_only" + repo_path.mkdir() + subprocess.run( + ["git", "init"], + cwd=repo_path, + check=True, + capture_output=True, + env=git_commit_envvars, + ) + + # Should return None and log debug message + caplog.set_level("DEBUG") + url = get_git_origin_url(repo_path) + assert url is None + assert "Could not get origin URL" in caplog.text + + def test_not_git_repo( + self, + tmp_path: pathlib.Path, + caplog: LogCaptureFixture, + ) -> None: + """Test handling non-git directory.""" + # Create a regular directory + regular_dir = tmp_path / "not_git" + regular_dir.mkdir() + + # Should return None + caplog.set_level("DEBUG") + url = get_git_origin_url(regular_dir) + assert url is None + assert "Could not get origin URL" in caplog.text + + +class TestAddFromFilesystem: + """Test add_from_filesystem with real git repositories.""" + + def test_single_repo( + self, + create_git_remote_repo: CreateRepoPytestFixtureFn, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + caplog: LogCaptureFixture, + ) -> None: + """Test scanning directory with one git repository.""" + caplog.set_level("INFO") + + # Create a scan directory + scan_dir = tmp_path / "projects" + scan_dir.mkdir() + + # Create and clone a repository + remote_path = create_git_remote_repo() + remote_url = f"file://{remote_path}" + repo_name = "myproject" + local_repo_path = scan_dir / repo_name + + subprocess.run( + ["git", "clone", remote_url, str(local_repo_path)], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + + # Create config file path + config_file = tmp_path / ".vcspull.yaml" + + # Run add_from_filesystem + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=True, + base_dir_key_arg=None, + yes=True, + ) + + # Verify config file was created with correct content + assert config_file.exists() + with config_file.open() as f: + config_data = yaml.safe_load(f) + + # Check the repository was added with correct structure + expected_key = str(scan_dir) + "/" + assert expected_key in config_data + assert repo_name in config_data[expected_key] + assert config_data[expected_key][repo_name] == {"repo": remote_url} + + # Check log messages + assert f"Adding '{repo_name}' ({remote_url})" in caplog.text + assert f"Successfully updated {config_file}" in caplog.text + + def test_multiple_repos_recursive( + self, + create_git_remote_repo: CreateRepoPytestFixtureFn, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + caplog: LogCaptureFixture, + ) -> None: + """Test scanning directory recursively with multiple git repositories.""" + caplog.set_level("INFO") + + # Create directory structure + scan_dir = tmp_path / "workspace" + scan_dir.mkdir() + subdir = scan_dir / "subfolder" + subdir.mkdir() + + # Create multiple repositories + repos = [] + for _i, (parent, name) in enumerate( + [ + (scan_dir, "repo1"), + (scan_dir, "repo2"), + (subdir, "nested_repo"), + ], + ): + remote_path = create_git_remote_repo() + remote_url = f"file://{remote_path}" + local_path = parent / name + + subprocess.run( + ["git", "clone", remote_url, str(local_path)], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + repos.append((name, remote_url)) + + # Create config file + config_file = tmp_path / ".vcspull.yaml" + + # Run add_from_filesystem recursively + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=True, + base_dir_key_arg=None, + yes=True, + ) + + # Verify all repos were added + with config_file.open() as f: + config_data = yaml.safe_load(f) + + expected_key = str(scan_dir) + "/" + assert expected_key in config_data + + for name, url in repos: + assert name in config_data[expected_key] + assert config_data[expected_key][name] == {"repo": url} + + def test_non_recursive( + self, + create_git_remote_repo: CreateRepoPytestFixtureFn, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + ) -> None: + """Test non-recursive scan only finds top-level repos.""" + # Create directory structure + scan_dir = tmp_path / "workspace" + scan_dir.mkdir() + nested_dir = scan_dir / "nested" + nested_dir.mkdir() + + # Create repos at different levels + # Top-level repo + remote1 = create_git_remote_repo() + subprocess.run( + ["git", "clone", f"file://{remote1}", str(scan_dir / "top_repo")], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + + # Nested repo (should not be found) + remote2 = create_git_remote_repo() + subprocess.run( + ["git", "clone", f"file://{remote2}", str(nested_dir / "nested_repo")], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + + config_file = tmp_path / ".vcspull.yaml" + + # Run non-recursive scan + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=False, + base_dir_key_arg=None, + yes=True, + ) + + # Verify only top-level repo was found + with config_file.open() as f: + config_data = yaml.safe_load(f) + + expected_key = str(scan_dir) + "/" + assert "top_repo" in config_data[expected_key] + assert "nested_repo" not in config_data[expected_key] + + def test_custom_base_dir_key( + self, + create_git_remote_repo: CreateRepoPytestFixtureFn, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + ) -> None: + """Test using a custom base directory key.""" + # Create and clone a repo + scan_dir = tmp_path / "repos" + scan_dir.mkdir() + + remote_path = create_git_remote_repo() + remote_url = f"file://{remote_path}" + repo_name = "test_repo" + + subprocess.run( + ["git", "clone", remote_url, str(scan_dir / repo_name)], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + + config_file = tmp_path / ".vcspull.yaml" + custom_key = "~/my_projects/" + + # Run with custom base dir key + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=True, + base_dir_key_arg=custom_key, + yes=True, + ) + + # Verify custom key was used + with config_file.open() as f: + config_data = yaml.safe_load(f) + + assert custom_key in config_data + assert repo_name in config_data[custom_key] + + def test_skip_existing_repos( + self, + create_git_remote_repo: CreateRepoPytestFixtureFn, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + caplog: LogCaptureFixture, + ) -> None: + """Test that existing repos in config are skipped.""" + caplog.set_level("INFO") + + # Create a repo + scan_dir = tmp_path / "repos" + scan_dir.mkdir() + + remote_path = create_git_remote_repo() + remote_url = f"file://{remote_path}" + repo_name = "existing_repo" + + subprocess.run( + ["git", "clone", remote_url, str(scan_dir / repo_name)], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + + # Pre-create config with this repo + config_file = tmp_path / ".vcspull.yaml" + config_data = {str(scan_dir) + "/": {repo_name: remote_url}} + save_config_yaml(config_file, config_data) + + # Run add_from_filesystem + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=True, + base_dir_key_arg=None, + yes=True, + ) + + # Verify enhanced output for existing repos + assert "Found 1 existing repositories in configuration:" in caplog.text + assert f"• {repo_name} ({remote_url})" in caplog.text + assert f"at {scan_dir!s}/{repo_name} in {config_file}" in caplog.text + assert ( + "All found repositories already exist in the configuration. Nothing to do." + in caplog.text + ) + + def test_user_confirmation( + self, + create_git_remote_repo: CreateRepoPytestFixtureFn, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + monkeypatch: pytest.MonkeyPatch, + caplog: LogCaptureFixture, + ) -> None: + """Test user confirmation prompt.""" + caplog.set_level("INFO") + + # Create a repo + scan_dir = tmp_path / "repos" + scan_dir.mkdir() + + remote_path = create_git_remote_repo() + remote_url = f"file://{remote_path}" + + subprocess.run( + ["git", "clone", remote_url, str(scan_dir / "repo1")], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + + config_file = tmp_path / ".vcspull.yaml" + + # Mock user input as "n" (no) + monkeypatch.setattr("builtins.input", lambda _: "n") + + # Run without --yes flag + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=True, + base_dir_key_arg=None, + yes=False, + ) + + # Verify aborted + assert "Aborted by user" in caplog.text + assert not config_file.exists() + + def test_no_repos_found( + self, + tmp_path: pathlib.Path, + caplog: LogCaptureFixture, + ) -> None: + """Test handling when no git repositories are found.""" + caplog.set_level("INFO") + + # Create empty directory + scan_dir = tmp_path / "empty" + scan_dir.mkdir() + + config_file = tmp_path / ".vcspull.yaml" + + # Run scan + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=True, + base_dir_key_arg=None, + yes=True, + ) + + # Verify appropriate message + assert f"No git repositories found in {scan_dir}" in caplog.text + assert not config_file.exists() + + def test_repo_without_origin( + self, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + caplog: LogCaptureFixture, + ) -> None: + """Test handling repository without origin remote.""" + caplog.set_level("WARNING") + + # Create scan directory + scan_dir = tmp_path / "repos" + scan_dir.mkdir() + + # Create local git repo without remote + repo_path = scan_dir / "local_only" + repo_path.mkdir() + subprocess.run( + ["git", "init"], + cwd=repo_path, + check=True, + capture_output=True, + env=git_commit_envvars, + ) + + config_file = tmp_path / ".vcspull.yaml" + + # Run scan + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=True, + base_dir_key_arg=None, + yes=True, + ) + + # Verify warning and repo was skipped + assert ( + f"Could not determine remote URL for git repository at {repo_path}" + in caplog.text + ) + assert not config_file.exists() # No repos added, so no file created + + def test_detailed_existing_repos_output( + self, + create_git_remote_repo: CreateRepoPytestFixtureFn, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + caplog: LogCaptureFixture, + ) -> None: + """Test detailed output when multiple repositories already exist.""" + caplog.set_level("INFO") + + # Create scan directory with multiple repos + scan_dir = tmp_path / "existing_repos" + scan_dir.mkdir() + + # Create multiple repositories + repos_data = [] + for _i, repo_name in enumerate(["repo1", "repo2", "repo3"]): + remote_path = create_git_remote_repo() + remote_url = f"file://{remote_path}" + local_repo_path = scan_dir / repo_name + + subprocess.run( + ["git", "clone", remote_url, str(local_repo_path)], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + repos_data.append((repo_name, remote_url)) + + # Pre-create config with all repos + config_file = tmp_path / ".vcspull.yaml" + config_data = {str(scan_dir) + "/": dict(repos_data)} + save_config_yaml(config_file, config_data) + + # Run add_from_filesystem + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=True, + base_dir_key_arg=None, + yes=True, + ) + + # Verify detailed output + assert "Found 3 existing repositories in configuration:" in caplog.text + + # Check each repository is listed with correct details + for repo_name, remote_url in repos_data: + assert f"• {repo_name} ({remote_url})" in caplog.text + assert f"at {scan_dir!s}/{repo_name} in {config_file}" in caplog.text + + # Verify final message + assert ( + "All found repositories already exist in the configuration. Nothing to do." + in caplog.text + ) + + def test_mixed_existing_and_new_repos( + self, + create_git_remote_repo: CreateRepoPytestFixtureFn, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + caplog: LogCaptureFixture, + ) -> None: + """Test output when some repos exist and some are new.""" + caplog.set_level("INFO") + + # Create scan directory + scan_dir = tmp_path / "mixed_repos" + scan_dir.mkdir() + + # Create repositories + existing_repo_data = [] + new_repo_data = [] + + # Create two existing repos + for _i, repo_name in enumerate(["existing1", "existing2"]): + remote_path = create_git_remote_repo() + remote_url = f"file://{remote_path}" + local_repo_path = scan_dir / repo_name + + subprocess.run( + ["git", "clone", remote_url, str(local_repo_path)], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + existing_repo_data.append((repo_name, remote_url)) + + # Create two new repos + for _i, repo_name in enumerate(["new1", "new2"]): + remote_path = create_git_remote_repo() + remote_url = f"file://{remote_path}" + local_repo_path = scan_dir / repo_name + + subprocess.run( + ["git", "clone", remote_url, str(local_repo_path)], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + new_repo_data.append((repo_name, remote_url)) + + # Pre-create config with only existing repos + config_file = tmp_path / ".vcspull.yaml" + config_data = {str(scan_dir) + "/": dict(existing_repo_data)} + save_config_yaml(config_file, config_data) + + # Run add_from_filesystem + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=True, + base_dir_key_arg=None, + yes=True, + ) + + # Verify existing repos are listed + assert "Found 2 existing repositories in configuration:" in caplog.text + for repo_name, remote_url in existing_repo_data: + assert f"• {repo_name} ({remote_url})" in caplog.text + assert f"at {scan_dir!s}/{repo_name} in {config_file}" in caplog.text + + # Verify new repos are added + for repo_name, remote_url in new_repo_data: + assert f"Adding '{repo_name}' ({remote_url})" in caplog.text + + assert "Successfully updated" in caplog.text + + def test_many_existing_repos_summary( + self, + create_git_remote_repo: CreateRepoPytestFixtureFn, + tmp_path: pathlib.Path, + git_commit_envvars: dict[str, str], + caplog: LogCaptureFixture, + ) -> None: + """Test that many existing repos show summary instead of full list.""" + caplog.set_level("INFO") + + # Create scan directory + scan_dir = tmp_path / "many_repos" + scan_dir.mkdir() + + # Create many existing repos (more than 5) + existing_repo_data = [] + for i in range(8): + repo_name = f"existing{i}" + remote_path = create_git_remote_repo() + remote_url = f"file://{remote_path}" + local_repo_path = scan_dir / repo_name + + subprocess.run( + ["git", "clone", remote_url, str(local_repo_path)], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + existing_repo_data.append((repo_name, remote_url)) + + # Create one new repo + new_remote = create_git_remote_repo() + new_url = f"file://{new_remote}" + subprocess.run( + ["git", "clone", new_url, str(scan_dir / "new_repo")], + check=True, + capture_output=True, + env=git_commit_envvars, + ) + + # Pre-create config with existing repos + config_file = tmp_path / ".vcspull.yaml" + config_data = {str(scan_dir) + "/": dict(existing_repo_data)} + save_config_yaml(config_file, config_data) + + # Run add_from_filesystem + add_from_filesystem( + scan_dir_str=str(scan_dir), + config_file_path_str=str(config_file), + recursive=True, + base_dir_key_arg=None, + yes=True, + ) + + # Verify summary message for many repos + assert "Found 8 existing repositories already in configuration." in caplog.text + # Should NOT list individual repos + assert "• existing0" not in caplog.text + assert "• existing7" not in caplog.text + + # Verify new repo is shown clearly + assert "Found 1 new repository to add:" in caplog.text + assert "+ new_repo" in caplog.text diff --git a/tests/test_cli.py b/tests/test_cli.py index 43c02d17..cc221542 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -103,7 +103,8 @@ def test_sync_cli_filter_non_existent( with contextlib.suppress(SystemExit): cli(["sync", *sync_args]) - output = "".join(list(caplog.messages) + list(capsys.readouterr().out)) + captured = capsys.readouterr() + output = "".join([*caplog.messages, captured.out, captured.err]) if expected_in_out is not None: if isinstance(expected_in_out, str): diff --git a/tests/test_log.py b/tests/test_log.py new file mode 100644 index 00000000..b612ffc3 --- /dev/null +++ b/tests/test_log.py @@ -0,0 +1,739 @@ +"""Tests for vcspull logging utilities.""" + +from __future__ import annotations + +import logging +import typing as t + +import pytest +from colorama import Fore + +from vcspull.log import ( + LEVEL_COLORS, + DebugLogFormatter, + LogFormatter, + RepoFilter, + RepoLogFormatter, + SimpleLogFormatter, + setup_logger, +) + +if t.TYPE_CHECKING: + from _pytest.logging import LogCaptureFixture + + +@pytest.fixture(autouse=True) +def cleanup_loggers() -> t.Iterator[None]: + """Clean up logger configuration after each test.""" + yield + # Reset logging configuration to avoid test interference + for logger_name in [ + "vcspull", + "vcspull.cli.add", + "vcspull.cli.add_from_fs", + "vcspull.cli.sync", + "libvcs", + "test_logger", + "", # Root logger + ]: + logger = logging.getLogger(logger_name) + logger.handlers.clear() + logger.propagate = True + + +class TestLevelColors: + """Test LEVEL_COLORS constant.""" + + def test_level_colors_defined(self) -> None: + """Test that all standard log levels have color mappings.""" + expected_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + for level in expected_levels: + assert level in LEVEL_COLORS + + def test_level_color_values(self) -> None: + """Test that color values are correct colorama constants.""" + assert LEVEL_COLORS["DEBUG"] == Fore.BLUE + assert LEVEL_COLORS["INFO"] == Fore.GREEN + assert LEVEL_COLORS["WARNING"] == Fore.YELLOW + assert LEVEL_COLORS["ERROR"] == Fore.RED + assert LEVEL_COLORS["CRITICAL"] == Fore.RED + + +class LogFormatterTestCase(t.NamedTuple): + """Test case for log formatter behavior.""" + + test_id: str + level: str + message: str + logger_name: str + expected_contains: list[str] + expected_not_contains: list[str] = [] # noqa: RUF012 + + +LOG_FORMATTER_TEST_CASES: list[LogFormatterTestCase] = [ + LogFormatterTestCase( + test_id="info_level", + level="INFO", + message="test info message", + logger_name="test.logger", + expected_contains=["(INFO)", "test info message", "test.logger"], + ), + LogFormatterTestCase( + test_id="debug_level", + level="DEBUG", + message="debug information", + logger_name="debug.logger", + expected_contains=["(DEBUG)", "debug information", "debug.logger"], + ), + LogFormatterTestCase( + test_id="warning_level", + level="WARNING", + message="warning message", + logger_name="warn.logger", + expected_contains=["(WARNING)", "warning message", "warn.logger"], + ), + LogFormatterTestCase( + test_id="error_level", + level="ERROR", + message="error occurred", + logger_name="error.logger", + expected_contains=["(ERROR)", "error occurred", "error.logger"], + ), +] + + +class TestLogFormatter: + """Test LogFormatter class.""" + + def test_template_includes_required_elements(self) -> None: + """Test that template includes all required formatting elements.""" + formatter = LogFormatter() + record = logging.LogRecord( + name="test.logger", + level=logging.INFO, + pathname="/test/path.py", + lineno=42, + msg="test message", + args=(), + exc_info=None, + ) + + template = formatter.template(record) + + # Should include levelname, asctime, and name placeholders + assert "%(levelname)" in template + assert "%(asctime)s" in template + assert "%(name)s" in template + + def test_format_basic_message(self) -> None: + """Test formatting a basic log message.""" + formatter = LogFormatter() + record = logging.LogRecord( + name="test.logger", + level=logging.INFO, + pathname="/test/path.py", + lineno=42, + msg="test message", + args=(), + exc_info=None, + ) + + result = formatter.format(record) + + assert "test message" in result + assert "test.logger" in result + assert "(INFO)" in result + + def test_format_handles_newlines(self) -> None: + """Test that multiline messages are properly indented.""" + formatter = LogFormatter() + record = logging.LogRecord( + name="test.logger", + level=logging.INFO, + pathname="/test/path.py", + lineno=42, + msg="line 1\nline 2\nline 3", + args=(), + exc_info=None, + ) + + result = formatter.format(record) + + # Newlines should be replaced with newline + indent + assert "\n line 2" in result + assert "\n line 3" in result + + def test_format_handles_bad_message(self) -> None: + """Test formatter handles malformed log messages gracefully.""" + formatter = LogFormatter() + + # Create a record that will cause getMessage() to fail + record = logging.LogRecord( + name="test.logger", + level=logging.INFO, + pathname="/test/path.py", + lineno=42, + msg="bad format %s %s", # Two placeholders + args=("only_one_arg",), # But only one argument + exc_info=None, + ) + + result = formatter.format(record) + + assert "Bad message" in result + + @pytest.mark.parametrize( + list(LogFormatterTestCase._fields), + LOG_FORMATTER_TEST_CASES, + ids=[test.test_id for test in LOG_FORMATTER_TEST_CASES], + ) + def test_formatter_levels( + self, + test_id: str, + level: str, + message: str, + logger_name: str, + expected_contains: list[str], + expected_not_contains: list[str], + ) -> None: + """Test formatter with different log levels.""" + formatter = LogFormatter() + level_int = getattr(logging, level) + + record = logging.LogRecord( + name=logger_name, + level=level_int, + pathname="/test/path.py", + lineno=42, + msg=message, + args=(), + exc_info=None, + ) + + result = formatter.format(record) + + for expected in expected_contains: + assert expected in result + + for not_expected in expected_not_contains: + assert not_expected not in result + + +class TestDebugLogFormatter: + """Test DebugLogFormatter class.""" + + def test_template_includes_debug_elements(self) -> None: + """Test that debug template includes module and function info.""" + formatter = DebugLogFormatter() + record = logging.LogRecord( + name="test.logger", + level=logging.INFO, + pathname="/test/module.py", + lineno=42, + msg="test message", + args=(), + exc_info=None, + ) + record.module = "test_module" + record.funcName = "test_function" + + template = formatter.template(record) + + # Should include module.funcName and lineno + assert "%(module)s.%(funcName)s()" in template + assert "%(lineno)d" in template + + def test_format_with_debug_info(self) -> None: + """Test formatting with debug information.""" + formatter = DebugLogFormatter() + record = logging.LogRecord( + name="test.logger", + level=logging.DEBUG, + pathname="/test/module.py", + lineno=123, + msg="debug message", + args=(), + exc_info=None, + ) + record.module = "test_module" + record.funcName = "test_function" + + result = formatter.format(record) + + assert "debug message" in result + assert "test_module.test_function()" in result + assert "123" in result + # DebugLogFormatter uses single letter level names + assert "(D)" in result + + +class TestSimpleLogFormatter: + """Test SimpleLogFormatter class.""" + + def test_format_returns_only_message(self) -> None: + """Test that simple formatter returns only the message.""" + formatter = SimpleLogFormatter() + record = logging.LogRecord( + name="test.logger", + level=logging.INFO, + pathname="/test/path.py", + lineno=42, + msg="simple message", + args=(), + exc_info=None, + ) + + result = formatter.format(record) + + # Should only contain the message, no extra formatting + assert result == "simple message" + + def test_format_with_arguments(self) -> None: + """Test simple formatter with message arguments.""" + formatter = SimpleLogFormatter() + record = logging.LogRecord( + name="test.logger", + level=logging.INFO, + pathname="/test/path.py", + lineno=42, + msg="message with %s and %d", + args=("text", 42), + exc_info=None, + ) + + result = formatter.format(record) + + assert result == "message with text and 42" + + def test_format_excludes_metadata(self) -> None: + """Test that simple formatter excludes timestamp, level, logger name.""" + formatter = SimpleLogFormatter() + record = logging.LogRecord( + name="very.long.logger.name", + level=logging.WARNING, + pathname="/very/long/path/to/module.py", + lineno=999, + msg="clean message", + args=(), + exc_info=None, + ) + + result = formatter.format(record) + + # Should not contain any metadata + assert "very.long.logger.name" not in result + assert "WARNING" not in result + assert "/very/long/path" not in result + assert "999" not in result + assert result == "clean message" + + +class TestRepoLogFormatter: + """Test RepoLogFormatter class.""" + + def test_template_formats_repo_info(self) -> None: + """Test that repo template includes bin_name and keyword.""" + formatter = RepoLogFormatter() + record = logging.LogRecord( + name="libvcs.sync.git", + level=logging.INFO, + pathname="/libvcs/sync/git.py", + lineno=42, + msg="git operation", + args=(), + exc_info=None, + ) + record.bin_name = "git" + record.keyword = "clone" + record.message = "git operation" # RepoLogFormatter expects this + + template = formatter.template(record) + + # Template should reference the actual values, not the variable names + assert "git" in template + assert "clone" in template + + def test_format_repo_message(self) -> None: + """Test formatting a repository operation message.""" + formatter = RepoLogFormatter() + record = logging.LogRecord( + name="libvcs.sync.git", + level=logging.INFO, + pathname="/libvcs/sync/git.py", + lineno=42, + msg="Cloning repository", + args=(), + exc_info=None, + ) + record.bin_name = "git" + record.keyword = "clone" + + result = formatter.format(record) + + # Should include bin_name and keyword formatting + assert "git" in result + assert "clone" in result + assert "Cloning repository" in result + + +class TestRepoFilter: + """Test RepoFilter class.""" + + def test_filter_accepts_repo_records(self) -> None: + """Test that filter accepts records with keyword attribute.""" + repo_filter = RepoFilter() + record = logging.LogRecord( + name="libvcs.sync.git", + level=logging.INFO, + pathname="/libvcs/sync/git.py", + lineno=42, + msg="repo message", + args=(), + exc_info=None, + ) + record.keyword = "clone" + + assert repo_filter.filter(record) is True + + def test_filter_rejects_non_repo_records(self) -> None: + """Test that filter rejects records without keyword attribute.""" + repo_filter = RepoFilter() + record = logging.LogRecord( + name="regular.logger", + level=logging.INFO, + pathname="/regular/module.py", + lineno=42, + msg="regular message", + args=(), + exc_info=None, + ) + # No keyword attribute + + assert repo_filter.filter(record) is False + + def test_filter_rejects_empty_keyword(self) -> None: + """Test that filter works correctly with keyword attribute present.""" + repo_filter = RepoFilter() + record = logging.LogRecord( + name="libvcs.sync.git", + level=logging.INFO, + pathname="/libvcs/sync/git.py", + lineno=42, + msg="repo message", + args=(), + exc_info=None, + ) + # Set keyword to test the "keyword" in record.__dict__ check + record.__dict__["keyword"] = "pull" + + assert repo_filter.filter(record) is True + + +class TestSetupLogger: + """Test setup_logger function.""" + + def test_setup_logger_default_behavior(self, caplog: LogCaptureFixture) -> None: + """Test setup_logger with default parameters.""" + # Use a test logger to avoid interfering with main logger + test_logger = logging.getLogger("test_logger") + test_logger.handlers.clear() + + setup_logger(test_logger, level="INFO") + + # setup_logger doesn't add handlers to individual loggers anymore + # it sets up the vcspull logger hierarchy + vcspull_logger = logging.getLogger("vcspull") + assert len(vcspull_logger.handlers) > 0 + + def test_setup_logger_custom_level(self, caplog: LogCaptureFixture) -> None: + """Test setup_logger with custom log level.""" + # Clear handlers first to avoid interference + root_logger = logging.getLogger() + for logger_name in [ + "vcspull", + "vcspull.cli.add", + "vcspull.cli.add_from_fs", + "vcspull.cli.sync", + "libvcs", + ]: + logger = logging.getLogger(logger_name) + logger.handlers.clear() + root_logger.handlers.clear() + + setup_logger(level="DEBUG") + + # Check that loggers were set to DEBUG level + vcspull_logger = logging.getLogger("vcspull") + assert vcspull_logger.level == logging.DEBUG + + def test_setup_logger_creates_vcspull_logger( + self, + caplog: LogCaptureFixture, + ) -> None: + """Test that setup_logger creates vcspull logger with debug formatter.""" + # Clear handlers first to avoid interference + root_logger = logging.getLogger() + for logger_name in [ + "vcspull", + "vcspull.cli.add", + "vcspull.cli.add_from_fs", + "vcspull.cli.sync", + "libvcs", + ]: + logger = logging.getLogger(logger_name) + logger.handlers.clear() + root_logger.handlers.clear() + + setup_logger(level="INFO") + + vcspull_logger = logging.getLogger("vcspull") + assert len(vcspull_logger.handlers) > 0 + assert vcspull_logger.propagate is False + + # Test that it uses DebugLogFormatter by checking handler type + handler = vcspull_logger.handlers[0] + assert isinstance(handler.formatter, DebugLogFormatter) + + def test_setup_logger_creates_cli_add_logger( + self, + caplog: LogCaptureFixture, + ) -> None: + """Test that setup_logger creates CLI add logger with simple formatter.""" + # Clear handlers first to avoid interference + root_logger = logging.getLogger() + for logger_name in [ + "vcspull", + "vcspull.cli.add", + "vcspull.cli.add_from_fs", + "vcspull.cli.sync", + "libvcs", + ]: + logger = logging.getLogger(logger_name) + logger.handlers.clear() + root_logger.handlers.clear() + + setup_logger(level="INFO") + + add_logger = logging.getLogger("vcspull.cli.add") + assert len(add_logger.handlers) > 0 + assert add_logger.propagate is False + + # Test that it uses SimpleLogFormatter + handler = add_logger.handlers[0] + assert isinstance(handler.formatter, SimpleLogFormatter) + + def test_setup_logger_creates_cli_add_fs_logger( + self, + caplog: LogCaptureFixture, + ) -> None: + """Test that setup_logger creates CLI add-from-fs logger.""" + # Clear handlers first to avoid interference + root_logger = logging.getLogger() + for logger_name in [ + "vcspull", + "vcspull.cli.add", + "vcspull.cli.add_from_fs", + "vcspull.cli.sync", + "libvcs", + ]: + logger = logging.getLogger(logger_name) + logger.handlers.clear() + root_logger.handlers.clear() + + setup_logger(level="INFO") + + add_fs_logger = logging.getLogger("vcspull.cli.add_from_fs") + assert len(add_fs_logger.handlers) > 0 + assert add_fs_logger.propagate is False + + # Test that it uses SimpleLogFormatter + handler = add_fs_logger.handlers[0] + assert isinstance(handler.formatter, SimpleLogFormatter) + + def test_setup_logger_creates_cli_sync_logger( + self, + caplog: LogCaptureFixture, + ) -> None: + """Test that setup_logger creates CLI sync logger.""" + # Clear handlers first to avoid interference + root_logger = logging.getLogger() + for logger_name in [ + "vcspull", + "vcspull.cli.add", + "vcspull.cli.add_from_fs", + "vcspull.cli.sync", + "libvcs", + ]: + logger = logging.getLogger(logger_name) + logger.handlers.clear() + root_logger.handlers.clear() + + setup_logger(level="INFO") + + sync_logger = logging.getLogger("vcspull.cli.sync") + assert len(sync_logger.handlers) > 0 + assert sync_logger.propagate is False + + # Test that it uses SimpleLogFormatter + handler = sync_logger.handlers[0] + assert isinstance(handler.formatter, SimpleLogFormatter) + + def test_setup_logger_creates_libvcs_logger( + self, + caplog: LogCaptureFixture, + ) -> None: + """Test that setup_logger creates libvcs logger with repo formatter.""" + # Clear handlers first to avoid interference + root_logger = logging.getLogger() + for logger_name in [ + "vcspull", + "vcspull.cli.add", + "vcspull.cli.add_from_fs", + "vcspull.cli.sync", + "libvcs", + ]: + logger = logging.getLogger(logger_name) + logger.handlers.clear() + root_logger.handlers.clear() + + setup_logger(level="INFO") + + libvcs_logger = logging.getLogger("libvcs") + assert len(libvcs_logger.handlers) > 0 + assert libvcs_logger.propagate is False + + # Test that it uses RepoLogFormatter + handler = libvcs_logger.handlers[0] + assert isinstance(handler.formatter, RepoLogFormatter) + + def test_setup_logger_prevents_duplicate_handlers( + self, + caplog: LogCaptureFixture, + ) -> None: + """Test that setup_logger doesn't create duplicate handlers.""" + test_logger = logging.getLogger("test_logger") + test_logger.handlers.clear() + + # Call setup_logger twice + setup_logger(test_logger, level="INFO") + initial_handler_count = len(test_logger.handlers) + + setup_logger(test_logger, level="INFO") + final_handler_count = len(test_logger.handlers) + + # Should not have added more handlers + assert initial_handler_count == final_handler_count + + def test_setup_logger_with_none_creates_root_logger( + self, + caplog: LogCaptureFixture, + ) -> None: + """Test that setup_logger with None creates root logger configuration.""" + # Clear handlers first to avoid interference + root_logger = logging.getLogger() + for logger_name in [ + "vcspull", + "vcspull.cli.add", + "vcspull.cli.add_from_fs", + "vcspull.cli.sync", + "libvcs", + ]: + logger = logging.getLogger(logger_name) + logger.handlers.clear() + root_logger.handlers.clear() + + # This tests the default behavior when no logger is passed + setup_logger(log=None, level="WARNING") + + # Should have created the vcspull logger hierarchy + vcspull_logger = logging.getLogger("vcspull") + assert len(vcspull_logger.handlers) > 0 + assert vcspull_logger.level == logging.WARNING + + +class TestLoggerIntegration: + """Test logger integration and behavior.""" + + def test_simple_formatter_integration(self, caplog: LogCaptureFixture) -> None: + """Test SimpleLogFormatter integration with actual logger.""" + logger = logging.getLogger("test_simple") + logger.handlers.clear() + + # Add handler with simple formatter + handler = logging.StreamHandler() + handler.setFormatter(SimpleLogFormatter()) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + + # Test logging + logger.info("clean message") + + # caplog should capture the clean message + assert "clean message" in caplog.text + + def test_debug_formatter_integration(self, caplog: LogCaptureFixture) -> None: + """Test DebugLogFormatter integration with actual logger.""" + logger = logging.getLogger("test_debug") + logger.handlers.clear() + + # Add handler with debug formatter + handler = logging.StreamHandler() + handler.setFormatter(DebugLogFormatter()) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + # Test logging + logger.debug("debug message") + + # caplog should capture the formatted message + assert "debug message" in caplog.text + + def test_repo_filter_integration(self, caplog: LogCaptureFixture) -> None: + """Test RepoFilter integration with actual logger.""" + logger = logging.getLogger("test_repo") + logger.handlers.clear() + logger.propagate = False # Prevent logs from going to caplog + + # Add handler with repo formatter and filter + handler = logging.StreamHandler() + handler.setFormatter(RepoLogFormatter()) + handler.addFilter(RepoFilter()) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + + # Create a log record with repo attributes + record = logging.LogRecord( + name="test_repo", + level=logging.INFO, + pathname="test.py", + lineno=1, + msg="repo operation", + args=(), + exc_info=None, + ) + record.bin_name = "git" + record.keyword = "clone" + record.message = "repo operation" # RepoLogFormatter expects this + + # This should be captured since it has keyword + logger.handle(record) + + # Regular log without repo attributes should be filtered out + logger.info("regular message") + + # The caplog should not contain the regular message due to the filter + # but may contain the repo message depending on how caplog works with filters + # For this test, we just verify that RepoFilter accepts records with keyword + repo_filter = RepoFilter() + assert repo_filter.filter(record) is True + + regular_record = logging.LogRecord( + name="test_repo", + level=logging.INFO, + pathname="test.py", + lineno=1, + msg="regular message", + args=(), + exc_info=None, + ) + assert repo_filter.filter(regular_record) is False