diff --git a/.github/linters/.ruff.toml b/.github/linters/.ruff.toml new file mode 100644 index 0000000..7750f16 --- /dev/null +++ b/.github/linters/.ruff.toml @@ -0,0 +1,14 @@ +# Ruff configuration for test fixtures +# This configuration relaxes rules for test fixtures which are +# often using non-standard import orders and mock functionality + +extend = "../../pyproject.toml" + +[lint] +ignore = [ + "E402", # Module level import not at top of file (common in test fixtures) + "F401", # Unused import (needed for test fixtures) + "F811", # Redefinition of unused name (common in test fixtures) + "F821", # Undefined name (often happens with dynamically created objects) + "F841", # Local variable assigned but never used (common in test fixtures) +] \ No newline at end of file diff --git a/.github/workflows/pr-checks.yaml b/.github/workflows/pr-checks.yaml index 492b85b..d7d4c7f 100644 --- a/.github/workflows/pr-checks.yaml +++ b/.github/workflows/pr-checks.yaml @@ -39,6 +39,14 @@ jobs: - name: Run unit tests run: uv run pytest -m unit -v + - name: Install sbctl + run: | + curl -L -o sbctl.tar.gz "https://github.com/replicatedhq/sbctl/releases/latest/download/sbctl_linux_amd64.tar.gz" + tar xzf sbctl.tar.gz + chmod +x sbctl + sudo mv sbctl /usr/local/bin/ + sbctl --version || echo "sbctl installed but version command not available" + - name: Run integration tests run: uv run pytest -m integration -v diff --git a/pyproject.toml b/pyproject.toml index 3011bdd..c2ad65c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,6 +74,15 @@ timeout = 30 [tool.ruff] line-length = 100 target-version = "py313" +exclude = [ + ".git", + ".github", + "__pycache__", + "build", + "dist", + "*.yml", + "*.yaml", +] [tool.black] line-length = 100 diff --git a/src/mcp_server_troubleshoot/lifecycle.py b/src/mcp_server_troubleshoot/lifecycle.py index 649f3ee..83b1526 100644 --- a/src/mcp_server_troubleshoot/lifecycle.py +++ b/src/mcp_server_troubleshoot/lifecycle.py @@ -205,7 +205,7 @@ def handle_signal(signum: int, frame: Any) -> None: import shutil shutil.rmtree(bundle_path) - logger.info(f"Successfully removed bundle directory") + logger.info("Successfully removed bundle directory") except Exception as e: logger.error(f"Error removing bundle directory: {e}") except Exception as e: diff --git a/tests/.ruff.toml b/tests/.ruff.toml new file mode 100644 index 0000000..09666d7 --- /dev/null +++ b/tests/.ruff.toml @@ -0,0 +1,5 @@ +# Ruff configuration for tests directory +# This applies more relaxed rules for test files which often use +# non-standard import orders, mocks, and test fixtures + +extend = "../.github/linters/.ruff.toml" \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index cab3a98..ac2f5cc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -117,12 +117,12 @@ def is_docker_available(): """Check if Podman is available on the system.""" try: result = subprocess.run( - ["podman", "--version"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + ["podman", "--version"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True, timeout=5, - check=False + check=False, ) return result.returncode == 0 except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired): @@ -164,7 +164,7 @@ def build_container_image(project_root, use_mock_sbctl=False): stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=30, - check=False + check=False, ) # For test mode with mock sbctl, we need to modify the Containerfile @@ -222,7 +222,7 @@ def build_container_image(project_root, use_mock_sbctl=False): stderr=subprocess.PIPE, text=True, timeout=300, - check=True + check=True, ) # Clean up the temporary Containerfile @@ -237,7 +237,7 @@ def build_container_image(project_root, use_mock_sbctl=False): stderr=subprocess.PIPE, text=True, timeout=300, - check=True + check=True, ) return True, result @@ -275,15 +275,15 @@ def docker_image(request): ["podman", "image", "exists", "mcp-server-troubleshoot:latest"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, - check=False + check=False, ) - + # Image exists already, just use it if image_check.returncode == 0: print("\nUsing existing Podman image for tests...") yield "mcp-server-troubleshoot:latest" return - + # Determine if we should use mock sbctl based on markers use_mock_sbctl = request.node.get_closest_marker("mock_sbctl") is not None @@ -316,7 +316,7 @@ def docker_image(request): stderr=subprocess.PIPE, text=True, timeout=10, - check=False + check=False, ) containers = containers_result.stdout.strip().split("\n") if containers_result.stdout else [] @@ -328,7 +328,7 @@ def docker_image(request): stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=10, - check=False + check=False, ) except Exception: pass diff --git a/tests/e2e/test_container.py b/tests/e2e/test_container.py index 6e40fd0..d384c37 100644 --- a/tests/e2e/test_container.py +++ b/tests/e2e/test_container.py @@ -164,12 +164,12 @@ def test_mcp_protocol(container_setup, docker_image): ["podman", "rm", "-f", container_id], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, - check=False + check=False, ) # Start the container using run instead of Popen print(f"Starting test container: {container_id}") - + # Use detached mode to run in background container_start = subprocess.run( [ @@ -190,14 +190,14 @@ def test_mcp_protocol(container_setup, docker_image): stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, - check=False + check=False, ) - + # Print full container start output for debugging print(f"Container start stdout: {container_start.stdout}") print(f"Container start stderr: {container_start.stderr}") print(f"Container start return code: {container_start.returncode}") - + if container_start.returncode != 0: print(f"Failed to start container: {container_start.stderr}") pytest.fail(f"Failed to start container: {container_start.stderr}") @@ -213,21 +213,21 @@ def test_mcp_protocol(container_setup, docker_image): stderr=subprocess.PIPE, text=True, ) - + print(f"Container status: {ps_check.stdout}") - + # Also get logs in case it failed to start properly logs_check = subprocess.run( ["podman", "logs", container_id], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, - check=False + check=False, ) - + print(f"Container logs stdout: {logs_check.stdout}") print(f"Container logs stderr: {logs_check.stderr}") - + # Check specifically for this container running_check = subprocess.run( ["podman", "ps", "-q", "-f", f"name={container_id}"], @@ -303,7 +303,7 @@ def timeout_handler(): finally: # Clean up the container print(f"Cleaning up container: {container_id}") - + # Stop and remove the container with a more robust cleanup procedure try: # First try a normal removal @@ -312,7 +312,7 @@ def timeout_handler(): stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, - timeout=10 + timeout=10, ) except subprocess.TimeoutExpired: # If that times out, try to kill it first @@ -322,7 +322,7 @@ def timeout_handler(): stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, - timeout=5 + timeout=5, ) # Then try removal again subprocess.run( diff --git a/tests/e2e/test_docker.py b/tests/e2e/test_docker.py index c57cce8..1fa6f1d 100644 --- a/tests/e2e/test_docker.py +++ b/tests/e2e/test_docker.py @@ -105,7 +105,9 @@ def test_podman_build(): # Build the image with progress output print("\nBuilding Podman image...") - output = run_command(f"podman build --progress=plain -t {test_tag} -f Containerfile .", cwd=str(project_dir)) + output = run_command( + f"podman build --progress=plain -t {test_tag} -f Containerfile .", cwd=str(project_dir) + ) print(f"\nBuild output:\n{output}\n") # Check if the image exists diff --git a/tests/e2e/test_podman.py b/tests/e2e/test_podman.py index c57cce8..1fa6f1d 100644 --- a/tests/e2e/test_podman.py +++ b/tests/e2e/test_podman.py @@ -105,7 +105,9 @@ def test_podman_build(): # Build the image with progress output print("\nBuilding Podman image...") - output = run_command(f"podman build --progress=plain -t {test_tag} -f Containerfile .", cwd=str(project_dir)) + output = run_command( + f"podman build --progress=plain -t {test_tag} -f Containerfile .", cwd=str(project_dir) + ) print(f"\nBuild output:\n{output}\n") # Check if the image exists diff --git a/tests/fixtures/.ruff.toml b/tests/fixtures/.ruff.toml new file mode 100644 index 0000000..c38e99b --- /dev/null +++ b/tests/fixtures/.ruff.toml @@ -0,0 +1,5 @@ +# Ruff configuration for test fixtures +# Fixtures have special requirements and often use non-standard +# import orders and unconventional patterns for mocking + +extend = "../.ruff.toml" \ No newline at end of file diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b7c2082..e08d07d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -9,4 +9,4 @@ from unittest.mock import AsyncMock, Mock, patch # Import TestAssertions class and fixture from unit tests -from tests.unit.conftest import TestAssertions, test_assertions \ No newline at end of file +from tests.unit.conftest import TestAssertions, test_assertions diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py deleted file mode 120000 index c0d53c7..0000000 --- a/tests/integration/test_integration.py +++ /dev/null @@ -1 +0,0 @@ -tests/integration/test_integration.py.bak \ No newline at end of file diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py new file mode 100644 index 0000000..75bdcda --- /dev/null +++ b/tests/integration/test_integration.py @@ -0,0 +1,13 @@ +""" +Integration tests for the complete MCP server integration. + +This file provides tests for the complete MCP server integration, +including the FastMCP protocol implementation. +""" + +import pytest + +# Mark all tests in this file as integration tests +pytestmark = pytest.mark.integration + +# Future integration tests will be added here as they are developed diff --git a/tests/integration/test_real_bundle.py b/tests/integration/test_real_bundle.py index b920b63..aaa0eb1 100644 --- a/tests/integration/test_real_bundle.py +++ b/tests/integration/test_real_bundle.py @@ -1,7 +1,7 @@ """ Tests for real support bundle integration. -These tests verify the behavior of the MCP server components +These tests verify the behavior of the MCP server components with actual support bundles, focusing on user-visible behavior rather than implementation details. """ diff --git a/tests/integration/test_stdio_lifecycle.py b/tests/integration/test_stdio_lifecycle.py index 8ba4ef3..da75331 100644 --- a/tests/integration/test_stdio_lifecycle.py +++ b/tests/integration/test_stdio_lifecycle.py @@ -32,11 +32,11 @@ async def test_stdio_lifecycle_docstring(): """ This test exists to document why the stdio lifecycle tests were removed. - + The stdio lifecycle functionality is now properly tested in the e2e container tests which provide a more appropriate environment. """ # This is a documentation test only - it doesn't actually test functionality # It exists to preserve the test file for documentation purposes and to show # in test collection that the stdio lifecycle tests were intentionally removed - assert True, "This test exists only for documentation purposes" \ No newline at end of file + assert True, "This test exists only for documentation purposes" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 2b9b060..8b20d24 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -14,79 +14,83 @@ # Helper functions for async tests are defined in the main conftest.py + # Define test assertion helpers class TestAssertions: """ Collection of reusable test assertion helpers for common patterns in tests. - + These utilities make assertions more consistent, reduce duplication, and provide better error messages when tests fail. """ - + @staticmethod def assert_attributes_exist(obj: Any, attributes: List[str]) -> None: """ Assert that an object has all the specified attributes. - + Args: obj: The object to check attributes: List of attribute names to verify - + Raises: AssertionError: If any attribute is missing """ for attr in attributes: assert hasattr(obj, attr), f"Object should have attribute '{attr}'" - + @staticmethod - def assert_api_response_valid(response: List[Any], expected_type: str = "text", - contains: Optional[List[str]] = None) -> None: + def assert_api_response_valid( + response: List[Any], expected_type: str = "text", contains: Optional[List[str]] = None + ) -> None: """ Assert that an MCP API response is valid and contains expected content. - + Args: response: The API response to check expected_type: Expected response type (e.g., 'text') contains: List of strings that should be in the response text - + Raises: AssertionError: If response is invalid or missing expected content """ assert isinstance(response, list), "Response should be a list" assert len(response) > 0, "Response should not be empty" - assert hasattr(response[0], 'type'), "Response item should have 'type' attribute" + assert hasattr(response[0], "type"), "Response item should have 'type' attribute" assert response[0].type == expected_type, f"Response type should be '{expected_type}'" - - if contains and hasattr(response[0], 'text'): + + if contains and hasattr(response[0], "text"): for text in contains: assert text in response[0].text, f"Response should contain '{text}'" - + @staticmethod def assert_object_matches_attrs(obj: Any, expected_attrs: Dict[str, Any]) -> None: """ Assert that an object has attributes matching expected values. - + Args: obj: The object to check expected_attrs: Dictionary of attribute names and expected values - + Raises: AssertionError: If any attribute doesn't match the expected value """ for attr, expected in expected_attrs.items(): assert hasattr(obj, attr), f"Object should have attribute '{attr}'" actual = getattr(obj, attr) - assert actual == expected, f"Attribute '{attr}' value mismatch. Expected: {expected}, Got: {actual}" + assert ( + actual == expected + ), f"Attribute '{attr}' value mismatch. Expected: {expected}, Got: {actual}" @staticmethod async def assert_asyncio_timeout(coro, timeout: float = 0.1) -> None: """ Assert that an async coroutine times out. - + Args: coro: Coroutine to execute timeout: Timeout in seconds - + Raises: AssertionError: If the coroutine doesn't time out """ @@ -99,7 +103,7 @@ async def assert_asyncio_timeout(coro, timeout: float = 0.1) -> None: def test_assertions() -> TestAssertions: """ Provides test assertion helpers for common test patterns. - + These helpers improve test readability and consistency. """ return TestAssertions @@ -109,53 +113,55 @@ def test_assertions() -> TestAssertions: class TestFactory: """ Factory functions for creating test objects with minimal boilerplate. - + These factories create common test objects with default values, allowing tests to focus on the specific values that matter for that test. """ - + @staticmethod def create_bundle_metadata( id: str = "test_bundle", source: str = "test_source", path: Optional[Path] = None, kubeconfig_path: Optional[Path] = None, - initialized: bool = True + initialized: bool = True, ) -> "BundleMetadata": """ Create a BundleMetadata instance with sensible defaults. - + Args: id: Bundle ID source: Bundle source path: Bundle path kubeconfig_path: Path to kubeconfig initialized: Whether the bundle is initialized - + Returns: BundleMetadata instance """ from mcp_server_troubleshoot.bundle import BundleMetadata - + if path is None: path = Path(tempfile.mkdtemp()) - + if kubeconfig_path is None: kubeconfig_path = path / "kubeconfig" # Create the kubeconfig file if it doesn't exist if not kubeconfig_path.exists(): kubeconfig_path.parent.mkdir(parents=True, exist_ok=True) with open(kubeconfig_path, "w") as f: - f.write('{"apiVersion": "v1", "clusters": [{"cluster": {"server": "http://localhost:8001"}}]}') - + f.write( + '{"apiVersion": "v1", "clusters": [{"cluster": {"server": "http://localhost:8001"}}]}' + ) + return BundleMetadata( id=id, source=source, path=path, kubeconfig_path=kubeconfig_path, - initialized=initialized + initialized=initialized, ) - + @staticmethod def create_kubectl_result( command: str = "get pods", @@ -163,11 +169,11 @@ def create_kubectl_result( stdout: str = '{"items": []}', stderr: str = "", is_json: bool = True, - duration_ms: int = 100 + duration_ms: int = 100, ) -> "KubectlResult": """ Create a KubectlResult instance with sensible defaults. - + Args: command: The kubectl command exit_code: Command exit code @@ -175,22 +181,23 @@ def create_kubectl_result( stderr: Command standard error is_json: Whether the output is JSON duration_ms: Command execution duration - + Returns: KubectlResult instance """ from mcp_server_troubleshoot.kubectl import KubectlResult - + # Process output based on is_json output = stdout if is_json and stdout: import json + try: output = json.loads(stdout) except json.JSONDecodeError: output = stdout is_json = False - + return KubectlResult( command=command, exit_code=exit_code, @@ -198,7 +205,7 @@ def create_kubectl_result( stderr=stderr, output=output, is_json=is_json, - duration_ms=duration_ms + duration_ms=duration_ms, ) @@ -206,7 +213,7 @@ def create_kubectl_result( def test_factory() -> TestFactory: """ Provides factory functions for creating common test objects. - + These factories reduce boilerplate in tests and ensure consistency in test object creation. """ @@ -217,50 +224,51 @@ def test_factory() -> TestFactory: def error_setup(): """ Fixture for testing error scenarios with standard error conditions. - + This fixture provides a controlled environment for testing error handling without requiring each test to set up common error conditions. - + Returns: Dictionary with common error scenarios and mock objects """ # Create test directory temp_dir = Path(tempfile.mkdtemp()) - + # Set up non-existent paths nonexistent_path = temp_dir / "nonexistent" - + # Create a directory (not a file) directory_path = temp_dir / "directory" directory_path.mkdir() - + # Create an empty file empty_file = temp_dir / "empty.txt" empty_file.touch() - + # Create a text file text_file = temp_dir / "text.txt" text_file.write_text("This is a text file\nwith multiple lines\nfor testing errors") - + # Create a binary file binary_file = temp_dir / "binary.dat" with open(binary_file, "wb") as f: f.write(b"\x00\x01\x02\x03") - + # Create a mock bundle manager that returns None for active bundle from mcp_server_troubleshoot.bundle import BundleManager + no_bundle_manager = Mock(spec=BundleManager) no_bundle_manager.get_active_bundle.return_value = None - + # Create a mock process that fails error_process = AsyncMock() error_process.returncode = 1 error_process.communicate = AsyncMock(return_value=(b"", b"Command failed with an error")) - + # Create a mock asyncio client session with errors error_session = AsyncMock() error_session.get = AsyncMock(side_effect=Exception("Connection error")) - + return { "temp_dir": temp_dir, "nonexistent_path": nonexistent_path, @@ -279,9 +287,10 @@ def error_setup(): "PathNotFoundError": "mcp_server_troubleshoot.files.PathNotFoundError", "ReadFileError": "mcp_server_troubleshoot.files.ReadFileError", "InvalidPathError": "mcp_server_troubleshoot.files.InvalidPathError", - } + }, } + @pytest.fixture def fixtures_dir() -> Path: """ @@ -294,48 +303,52 @@ def fixtures_dir() -> Path: async def mock_command_environment(fixtures_dir): """ Creates a test environment with mock sbctl and kubectl binaries. - + This fixture: 1. Creates a temporary directory for the environment 2. Sets up mock sbctl and kubectl scripts 3. Adds the mock binaries to PATH 4. Yields the temp directory and restores PATH after test - + Args: fixtures_dir: Path to the test fixtures directory (pytest fixture) - + Returns: A tuple of (temp_dir, old_path) for use in tests """ # Create a temporary directory for the environment temp_dir = Path(tempfile.mkdtemp()) - + # Set up mock sbctl and kubectl mock_sbctl_path = fixtures_dir / "mock_sbctl.py" mock_kubectl_path = fixtures_dir / "mock_kubectl.py" temp_bin_dir = temp_dir / "bin" temp_bin_dir.mkdir(exist_ok=True) - + # Create sbctl mock sbctl_link = temp_bin_dir / "sbctl" with open(sbctl_link, "w") as f: - f.write(f"""#!/bin/bash + f.write( + f"""#!/bin/bash python "{mock_sbctl_path}" "$@" -""") +""" + ) os.chmod(sbctl_link, 0o755) - + # Create kubectl mock kubectl_link = temp_bin_dir / "kubectl" with open(kubectl_link, "w") as f: - f.write(f"""#!/bin/bash + f.write( + f"""#!/bin/bash python "{mock_kubectl_path}" "$@" -""") +""" + ) os.chmod(kubectl_link, 0o755) - + # Add mock tools to PATH old_path = os.environ.get("PATH", "") os.environ["PATH"] = f"{temp_bin_dir}:{old_path}" - + try: yield temp_dir finally: @@ -348,21 +361,21 @@ async def mock_command_environment(fixtures_dir): async def mock_bundle_manager(fixtures_dir): """ Creates a mock BundleManager with controlled behavior. - + This fixture provides a consistent mock for tests that need a BundleManager but don't need to test its real functionality. - + Args: fixtures_dir: Path to the test fixtures directory (pytest fixture) - + Returns: A Mock object with the BundleManager interface """ from mcp_server_troubleshoot.bundle import BundleManager, BundleMetadata - + # Create a mock bundle manager mock_manager = Mock(spec=BundleManager) - + # Set up common attributes temp_dir = Path(tempfile.mkdtemp()) mock_bundle = BundleMetadata( @@ -370,29 +383,34 @@ async def mock_bundle_manager(fixtures_dir): source="test_source", path=temp_dir, kubeconfig_path=temp_dir / "kubeconfig", - initialized=True + initialized=True, ) - + # Create a mock kubeconfig with open(mock_bundle.kubeconfig_path, "w") as f: - f.write('{"apiVersion": "v1", "clusters": [{"cluster": {"server": "http://localhost:8001"}}]}') - + f.write( + '{"apiVersion": "v1", "clusters": [{"cluster": {"server": "http://localhost:8001"}}]}' + ) + # Set up mock methods mock_manager.get_active_bundle.return_value = mock_bundle mock_manager.is_initialized.return_value = True mock_manager.check_api_server_available = AsyncMock(return_value=True) - mock_manager.get_diagnostic_info = AsyncMock(return_value={ - "api_server_available": True, - "bundle_initialized": True, - "sbctl_available": True, - "sbctl_process_running": True - }) - + mock_manager.get_diagnostic_info = AsyncMock( + return_value={ + "api_server_available": True, + "bundle_initialized": True, + "sbctl_available": True, + "sbctl_process_running": True, + } + ) + try: yield mock_manager finally: # Clean up temporary directory import shutil + shutil.rmtree(temp_dir) @@ -400,38 +418,38 @@ async def mock_bundle_manager(fixtures_dir): def test_file_setup(): """ Creates a test directory with a variety of files for testing file operations. - + This fixture: 1. Creates a temporary directory with subdirectories 2. Populates it with different types of files (text, binary) 3. Cleans up automatically after the test - + Returns: Path to the test directory """ # Create a test directory test_dir = Path(tempfile.mkdtemp()) - + try: # Create subdirectories dir1 = test_dir / "dir1" dir1.mkdir() - + dir2 = test_dir / "dir2" dir2.mkdir() subdir = dir2 / "subdir" subdir.mkdir() - + # Create text files file1 = dir1 / "file1.txt" file1.write_text("This is file 1\nLine 2\nLine 3\n") - + file2 = dir1 / "file2.txt" file2.write_text("This is file 2\nWith some content\n") - + file3 = subdir / "file3.txt" file3.write_text("This is file 3\nIn a subdirectory\n") - + # Create a file with specific search patterns search_file = dir1 / "search.txt" search_file.write_text( @@ -441,15 +459,16 @@ def test_file_setup(): "Multiple instances of the word pattern\n" "pattern appears again here\n" ) - + # Create a binary file binary_file = test_dir / "binary_file" with open(binary_file, "wb") as f: f.write(b"\x00\x01\x02\x03\x04\x05") - + # Return the test directory yield test_dir finally: # Clean up import shutil - shutil.rmtree(test_dir) \ No newline at end of file + + shutil.rmtree(test_dir) diff --git a/tests/unit/test_files_parametrized.py b/tests/unit/test_files_parametrized.py index 9cc73b4..6e7ed03 100644 --- a/tests/unit/test_files_parametrized.py +++ b/tests/unit/test_files_parametrized.py @@ -63,15 +63,15 @@ "invalid-empty-path", "invalid-simple-traversal", "invalid-complex-traversal", - ] + ], ) def test_list_files_args_validation_parametrized(path, recursive, expected_valid): """ Test ListFilesArgs validation with parameterized test cases. - + This test covers both valid and invalid inputs in a single test, making it easier to see all validation rules and add new cases. - + Args: path: Directory path to validate recursive: Whether to recursively list files @@ -95,7 +95,12 @@ def test_list_files_args_validation_parametrized(path, recursive, expected_valid # Valid cases ("file.txt", 0, 10, True), ("dir/file.txt", 5, 15, True), - ("absolute/path/file.txt", 0, 100, True), # Note: without leading slash - the validator removes it + ( + "absolute/path/file.txt", + 0, + 100, + True, + ), # Note: without leading slash - the validator removes it # Invalid cases ("", 0, 10, False), # Empty path ("../outside.txt", 0, 10, False), # Path traversal @@ -110,14 +115,14 @@ def test_list_files_args_validation_parametrized(path, recursive, expected_valid "invalid-path-traversal", "invalid-negative-start", "invalid-negative-end", - ] + ], ) def test_read_file_args_validation_parametrized(path, start_line, end_line, expected_valid): """ Test ReadFileArgs validation with parameterized test cases. - + This test ensures both valid and invalid inputs are properly validated. - + Args: path: File path to validate start_line: Starting line number @@ -156,16 +161,16 @@ def test_read_file_args_validation_parametrized(path, start_line, end_line, expe "invalid-empty-pattern", "invalid-max-results", "invalid-path-traversal", - ] + ], ) def test_grep_files_args_validation_parametrized( pattern, path, recursive, glob_pattern, case_sensitive, max_results, expected_valid ): """ Test GrepFilesArgs validation with parameterized test cases. - + This test ensures all validation rules are properly enforced. - + Args: pattern: Search pattern path: Directory path to search @@ -213,22 +218,27 @@ def test_grep_files_args_validation_parametrized( # Error case - path doesn't exist ("nonexistent", True, False, PathNotFoundError), # Error case - path is a file, not a directory - ("file.txt", False, True, FileSystemError), # Note: Changed from ReadFileError to FileSystemError + ( + "file.txt", + False, + True, + FileSystemError, + ), # Note: Changed from ReadFileError to FileSystemError ], ids=[ "valid-directory", "invalid-nonexistent", "invalid-not-directory", - ] + ], ) async def test_file_explorer_list_files_error_handling( path, is_directory, exists, expected_error, test_file_setup, test_factory ): """ Test that the file explorer handles listing errors correctly with parameterization. - + This test verifies error conditions for directory listings are handled properly. - + Args: path: Path to list is_directory: Whether the path is a directory @@ -244,15 +254,15 @@ async def test_file_explorer_list_files_error_handling( actual_path = "dir1/file1.txt" # This exists in the test_file_setup else: actual_path = path # Use as is for non-existent paths - + # Set up the bundle manager bundle_manager = Mock(spec=BundleManager) bundle = test_factory.create_bundle_metadata(path=test_file_setup) bundle_manager.get_active_bundle.return_value = bundle - + # Create file explorer explorer = FileExplorer(bundle_manager) - + if expected_error: # Should raise an error with pytest.raises(expected_error): @@ -282,16 +292,16 @@ async def test_file_explorer_list_files_error_handling( "valid-file", "invalid-nonexistent", "invalid-directory", - ] + ], ) async def test_file_explorer_read_file_error_handling( path, exists, is_file, is_directory, expected_error, test_file_setup, test_factory ): """ Test that the file explorer handles read errors correctly with parameterization. - + This test verifies error conditions for file reading are handled properly. - + Args: path: Path to read exists: Whether the path exists @@ -305,10 +315,10 @@ async def test_file_explorer_read_file_error_handling( bundle_manager = Mock(spec=BundleManager) bundle = test_factory.create_bundle_metadata(path=test_file_setup) bundle_manager.get_active_bundle.return_value = bundle - + # Create file explorer explorer = FileExplorer(bundle_manager) - + if expected_error: # Should raise an error with pytest.raises(expected_error): @@ -340,16 +350,16 @@ async def test_file_explorer_read_file_error_handling( "match-case-insensitive", "no-match", "multiple-matches", - ] + ], ) async def test_file_explorer_grep_files_behavior( path, pattern, case_sensitive, contains_match, test_file_setup, test_factory ): """ Test grep functionality with different patterns and case sensitivity. - + This test verifies the grep behavior with different search configurations. - + Args: path: Directory path to search pattern: Search pattern @@ -362,24 +372,24 @@ async def test_file_explorer_grep_files_behavior( bundle_manager = Mock(spec=BundleManager) bundle = test_factory.create_bundle_metadata(path=test_file_setup) bundle_manager.get_active_bundle.return_value = bundle - + # Create file explorer explorer = FileExplorer(bundle_manager) - + # Run the grep operation result = await explorer.grep_files(pattern, path, True, None, case_sensitive) - + # Verify the result structure assert isinstance(result, GrepResult) assert result.pattern == pattern assert result.path == path assert result.case_sensitive == case_sensitive - + # Verify match behavior if contains_match: assert result.total_matches > 0 assert len(result.matches) > 0 - + # Verify match structure for match in result.matches: assert pattern.lower() in match.line.lower() @@ -414,14 +424,14 @@ async def test_file_explorer_grep_files_behavior( "invalid-parent-traversal", "invalid-double-traversal", "invalid-triple-traversal", - ] + ], ) def test_file_explorer_path_normalization(path, expected_traversal, test_file_setup): """ Test path normalization for security vulnerabilities. - + This test ensures that directory traversal attempts are blocked properly. - + Args: path: Path to normalize expected_traversal: Whether path contains directory traversal @@ -437,10 +447,10 @@ def test_file_explorer_path_normalization(path, expected_traversal, test_file_se initialized=True, ) bundle_manager.get_active_bundle.return_value = bundle - + # Create the explorer explorer = FileExplorer(bundle_manager) - + if expected_traversal: # Should detect traversal and raise error with pytest.raises(InvalidPathError): @@ -451,4 +461,4 @@ def test_file_explorer_path_normalization(path, expected_traversal, test_file_se assert normalized.is_absolute() assert test_file_setup in normalized.parents or normalized == test_file_setup # Make sure we're still under the test directory (not elsewhere on disk) - assert str(normalized).startswith(str(test_file_setup)) \ No newline at end of file + assert str(normalized).startswith(str(test_file_setup)) diff --git a/tests/unit/test_kubectl_parametrized.py b/tests/unit/test_kubectl_parametrized.py index b3df702..8d8ebb4 100644 --- a/tests/unit/test_kubectl_parametrized.py +++ b/tests/unit/test_kubectl_parametrized.py @@ -59,15 +59,17 @@ "invalid-delete-operation", "invalid-exec-operation", "invalid-apply-operation", - ] + ], ) -def test_kubectl_command_args_validation_parametrized(command, timeout, json_output, expected_valid): +def test_kubectl_command_args_validation_parametrized( + command, timeout, json_output, expected_valid +): """ Test KubectlCommandArgs validation with parameterized test cases. - + This test covers both valid and invalid inputs in a single test, making it easier to see all validation rules and add new cases. - + Args: command: The kubectl command to validate timeout: Command timeout in seconds @@ -98,8 +100,11 @@ def test_kubectl_command_args_validation_parametrized(command, timeout, json_out ("get pods -o wide", ["kubectl", "get", "pods", "-o", "wide"], False), # Commands with additional flags ("get pods -n default", ["kubectl", "get", "pods", "-n", "default"], True), - ("get pods --field-selector=status.phase=Running", - ["kubectl", "get", "pods", "--field-selector=status.phase=Running"], True), + ( + "get pods --field-selector=status.phase=Running", + ["kubectl", "get", "pods", "--field-selector=status.phase=Running"], + True, + ), # Query-type commands ("api-resources", ["kubectl", "api-resources"], True), ("version", ["kubectl", "version"], True), @@ -113,15 +118,15 @@ def test_kubectl_command_args_validation_parametrized(command, timeout, json_out "field-selector", "api-resources", "version", - ] + ], ) async def test_kubectl_command_execution_parameters(command, expected_args, add_json, test_factory): """ Test that the kubectl executor handles different command formats correctly. - + This test ensures the command is properly parsed and executed for various command patterns with different options. - + Args: command: The kubectl command to execute expected_args: Expected command arguments list @@ -130,44 +135,44 @@ async def test_kubectl_command_execution_parameters(command, expected_args, add_ """ # Create a bundle for testing bundle = test_factory.create_bundle_metadata() - + # Create mock objects for testing mock_process = AsyncMock() mock_process.returncode = 0 mock_process.communicate = AsyncMock(return_value=(b'{"items": []}', b"")) - + # Create the executor with a mock bundle manager bundle_manager = Mock(spec=BundleManager) bundle_manager.get_active_bundle.return_value = bundle executor = KubectlExecutor(bundle_manager) - + # If we should add JSON format, add it to the expected args if add_json: expected_args.extend(["-o", "json"]) - + # Mock the create_subprocess_exec function with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: # Execute the command result = await executor._run_kubectl_command(command, bundle, 30, True) - + # Verify the command was constructed correctly mock_exec.assert_awaited_once() args = mock_exec.call_args[0] - + # Verify each argument matches the expected value for i, arg in enumerate(expected_args): assert args[i] == arg, f"Argument {i} should be '{arg}', got '{args[i]}'" - + # Verify the result structure assert result.exit_code == 0 assert isinstance(result.stdout, str) assert isinstance(result.stderr, str) - + # Verify JSON handling if add_json: assert result.is_json is True assert isinstance(result.output, dict) - + # Verify timing information assert isinstance(result.duration_ms, int) assert result.duration_ms >= 0 @@ -191,17 +196,17 @@ async def test_kubectl_command_execution_parameters(command, expected_args, add_ "error-resource-not-found", "error-unknown-flag", "error-command-not-found", - ] + ], ) async def test_kubectl_error_handling( return_code, stdout_content, stderr_content, expected_exit_code, should_raise, test_factory ): """ Test that the kubectl executor handles errors correctly. - + This test verifies that command failures are handled properly, with appropriate errors raised and error information preserved. - + Args: return_code: The command return code stdout_content: Command standard output @@ -212,33 +217,33 @@ async def test_kubectl_error_handling( """ # Create a bundle for testing bundle = test_factory.create_bundle_metadata() - + # Create mock objects for testing mock_process = AsyncMock() mock_process.returncode = return_code mock_process.communicate = AsyncMock( return_value=(stdout_content.encode(), stderr_content.encode()) ) - + # Create the executor with a mock bundle manager bundle_manager = Mock(spec=BundleManager) bundle_manager.get_active_bundle.return_value = bundle executor = KubectlExecutor(bundle_manager) - + # Test command execution with patch("asyncio.create_subprocess_exec", return_value=mock_process): if should_raise: # Should raise KubectlError with pytest.raises(KubectlError) as excinfo: await executor._run_kubectl_command("get pods", bundle, 30, True) - + # Verify error details assert excinfo.value.exit_code == expected_exit_code assert stderr_content in excinfo.value.stderr else: # Should succeed result = await executor._run_kubectl_command("get pods", bundle, 30, True) - + # Verify result details assert result.exit_code == expected_exit_code assert result.stdout == stdout_content @@ -249,45 +254,45 @@ async def test_kubectl_error_handling( async def test_kubectl_timeout_behavior(test_assertions, test_factory): """ Test that the kubectl executor properly handles command timeouts. - + This test verifies that: 1. Commands that exceed their timeout are properly terminated 2. KubectlError is raised with the correct error information 3. The process is killed to prevent resource leaks - + Args: test_assertions: Assertions helper fixture test_factory: Factory fixture for test objects """ # Create a bundle for testing bundle = test_factory.create_bundle_metadata() - + # Create a mock process mock_process = AsyncMock() mock_process.returncode = 0 - + # Create a function that hangs to simulate a timeout async def hang_forever(): await asyncio.sleep(30) # Much longer than our timeout return (b"", b"") - + mock_process.communicate = AsyncMock(side_effect=hang_forever) mock_process.kill = Mock() - + # Create the executor bundle_manager = Mock(spec=BundleManager) bundle_manager.get_active_bundle.return_value = bundle executor = KubectlExecutor(bundle_manager) - + # Test with a very short timeout with patch("asyncio.create_subprocess_exec", return_value=mock_process): with pytest.raises(KubectlError) as excinfo: await executor._run_kubectl_command("get pods", bundle, 0.1, True) - + # Verify error details assert "timed out" in str(excinfo.value).lower() assert excinfo.value.exit_code == 124 # Standard timeout exit code - + # Verify the process was killed mock_process.kill.assert_called_once() @@ -296,19 +301,19 @@ async def hang_forever(): async def test_kubectl_response_parsing(test_assertions, test_factory): """ Test that kubectl output is properly parsed based on format. - + This test verifies: 1. JSON output is properly parsed into Python objects 2. Non-JSON output is handled correctly 3. JSON parsing errors are handled gracefully - + Args: test_assertions: Assertions helper fixture test_factory: Factory fixture for test objects """ # Create a kubectl executor for testing executor = KubectlExecutor(Mock(spec=BundleManager)) - + # Test cases for output processing test_cases = [ # Valid JSON @@ -347,20 +352,20 @@ async def test_kubectl_response_parsing(test_assertions, test_factory): "expected_type": str, }, ] - + # Test each case for i, case in enumerate(test_cases): processed, is_json = executor._process_output(case["output"], case["try_json"]) - + # Assert the output format was detected correctly assert is_json == case["expected_is_json"], f"Case {i}: JSON detection failed" - + # Assert the output was processed to the right type assert isinstance(processed, case["expected_type"]), f"Case {i}: Wrong output type" - + # For JSON outputs, verify structure if case["expected_is_json"]: if isinstance(processed, dict) and "items" in processed: assert isinstance(processed["items"], list) elif isinstance(processed, list): - assert all(isinstance(item, dict) for item in processed) \ No newline at end of file + assert all(isinstance(item, dict) for item in processed) diff --git a/tests/unit/test_list_bundles.py b/tests/unit/test_list_bundles.py index 742ee97..4dc0795 100644 --- a/tests/unit/test_list_bundles.py +++ b/tests/unit/test_list_bundles.py @@ -164,7 +164,7 @@ async def test_bundle_validity_checker( @pytest.mark.asyncio async def test_relative_path_initialization(temp_bundle_dir, mock_valid_bundle): """Test that a bundle can be initialized using the relative path. - + This test verifies the user workflow of: 1. Listing available bundles 2. Using the relative_path from the bundle listing to initialize a bundle @@ -173,20 +173,20 @@ async def test_relative_path_initialization(temp_bundle_dir, mock_valid_bundle): import logging import os from unittest.mock import patch - + logger = logging.getLogger(__name__) - + # Create a bundle manager with our test directory bundle_manager = BundleManager(temp_bundle_dir) - + # List available bundles - this is the first step in the user workflow bundles = await bundle_manager.list_available_bundles() assert len(bundles) == 1 - + # Get the relative path - this is what a user would use from the UI relative_path = bundles[0].relative_path assert relative_path == "valid_bundle.tar.gz" - + # Instead of monkeypatching internal methods, we'll mock at a higher level # This focuses on the behavior (initializing a bundle) rather than implementation with patch.object(bundle_manager, "_initialize_with_sbctl", autospec=False) as mock_init: @@ -200,21 +200,21 @@ async def side_effect(bundle_path, output_dir): with open(kubeconfig_path, "w") as f: f.write("{}") return kubeconfig_path - + mock_init.side_effect = side_effect - + # Test initializing with relative path (the actual user workflow) metadata = await bundle_manager.initialize_bundle(relative_path) - + # Verify the behavior (not implementation details) assert metadata is not None assert metadata.initialized is True assert metadata.source == relative_path assert metadata.kubeconfig_path.exists() - + # Clean up for the next part of the test await bundle_manager._cleanup_active_bundle() - + # Test also works with full path metadata = await bundle_manager.initialize_bundle(str(mock_valid_bundle)) assert metadata is not None @@ -225,7 +225,7 @@ async def side_effect(bundle_path, output_dir): @pytest.mark.asyncio async def test_bundle_path_resolution_behavior(temp_bundle_dir, mock_valid_bundle): """Test that the bundle manager correctly resolves different path formats. - + This test verifies the behavior of the bundle path resolution logic: 1. Absolute paths are used directly 2. Relative paths are resolved within the bundle directory @@ -233,10 +233,10 @@ async def test_bundle_path_resolution_behavior(temp_bundle_dir, mock_valid_bundl """ import os from unittest.mock import patch - + # Create the bundle manager bundle_manager = BundleManager(temp_bundle_dir) - + # Create patch for _initialize_with_sbctl to avoid actual initialization with patch.object(bundle_manager, "_initialize_with_sbctl", autospec=False) as mock_init: # Set up the mock to return a valid kubeconfig path @@ -246,28 +246,29 @@ async def side_effect(bundle_path, output_dir): with open(kubeconfig_path, "w") as f: f.write("{}") return kubeconfig_path - + mock_init.side_effect = side_effect - + # Test 1: Absolute path - should be used directly metadata = await bundle_manager.initialize_bundle(str(mock_valid_bundle)) assert metadata.source == str(mock_valid_bundle) await bundle_manager._cleanup_active_bundle() - + # Test 2: Relative path - should be resolved within bundle directory # Create a subdirectory and move the bundle there subdir = temp_bundle_dir / "subdir" os.makedirs(subdir, exist_ok=True) rel_bundle = subdir / "subdir_bundle.tar.gz" import shutil + shutil.copy(mock_valid_bundle, rel_bundle) - + # Now try to initialize with a relative path from the bundle_dir rel_path = "subdir/subdir_bundle.tar.gz" metadata = await bundle_manager.initialize_bundle(rel_path) assert metadata.source == rel_path await bundle_manager._cleanup_active_bundle() - + # Test 3: Just filename - should be found within bundle directory metadata = await bundle_manager.initialize_bundle("valid_bundle.tar.gz") assert metadata.source == "valid_bundle.tar.gz" diff --git a/tests/unit/test_server_parametrized.py b/tests/unit/test_server_parametrized.py index 7c8e857..cbca8ed 100644 --- a/tests/unit/test_server_parametrized.py +++ b/tests/unit/test_server_parametrized.py @@ -58,39 +58,29 @@ "source,force,api_available,expected_strings", [ # Success case - all good - ( - "test_bundle.tar.gz", - False, - True, - ["Bundle initialized successfully", "test_bundle"] - ), + ("test_bundle.tar.gz", False, True, ["Bundle initialized successfully", "test_bundle"]), # Success case - force initialization - ( - "test_bundle.tar.gz", - True, - True, - ["Bundle initialized successfully", "test_bundle"] - ), + ("test_bundle.tar.gz", True, True, ["Bundle initialized successfully", "test_bundle"]), # Warning case - API server not available ( - "test_bundle.tar.gz", - False, - False, - ["Bundle initialized but API server is NOT available", "kubectl commands may fail"] + "test_bundle.tar.gz", + False, + False, + ["Bundle initialized but API server is NOT available", "kubectl commands may fail"], ), ], ids=[ "success-normal", "success-force", "warning-api-unavailable", - ] + ], ) async def test_initialize_bundle_tool_parametrized( source, force, api_available, expected_strings, test_assertions, test_factory ): """ Test the initialize_bundle tool with different inputs. - + Args: source: Bundle source force: Whether to force initialization @@ -106,7 +96,7 @@ async def test_initialize_bundle_tool_parametrized( id="test_bundle", source=temp_file.name, ) - + # Create a mock for the bundle manager with patch("mcp_server_troubleshoot.server.get_bundle_manager") as mock_get_manager: mock_manager = Mock() @@ -115,19 +105,20 @@ async def test_initialize_bundle_tool_parametrized( mock_manager.check_api_server_available = AsyncMock(return_value=api_available) mock_manager.get_diagnostic_info = AsyncMock(return_value={}) mock_get_manager.return_value = mock_manager - + # Create InitializeBundleArgs instance from mcp_server_troubleshoot.bundle import InitializeBundleArgs + args = InitializeBundleArgs(source=temp_file.name, force=force) - + # Call the tool function response = await initialize_bundle(args) - + # Verify method calls mock_manager._check_sbctl_available.assert_awaited_once() mock_manager.initialize_bundle.assert_awaited_once_with(temp_file.name, force) mock_manager.check_api_server_available.assert_awaited_once() - + # Use the test assertion helper to verify response test_assertions.assert_api_response_valid(response, "text", expected_strings) @@ -138,45 +129,44 @@ async def test_initialize_bundle_tool_parametrized( [ # Success case - JSON output ( - "get pods", - 30, - True, - 0, - '{"items": []}', - ["kubectl command executed successfully", "items", "Command metadata"] + "get pods", + 30, + True, + 0, + '{"items": []}', + ["kubectl command executed successfully", "items", "Command metadata"], ), # Success case - text output ( - "get pods", - 30, - False, - 0, - "NAME READY STATUS", - ["kubectl command executed successfully", "NAME READY STATUS"] + "get pods", + 30, + False, + 0, + "NAME READY STATUS", + ["kubectl command executed successfully", "NAME READY STATUS"], ), # Error case - command failed - ( - "get invalid", - 30, - True, - 1, - "", - ["kubectl command failed", "exit code 1"] - ), + ("get invalid", 30, True, 1, "", ["kubectl command failed", "exit code 1"]), ], ids=[ "success-json", "success-text", "error-command-failed", - ] + ], ) async def test_kubectl_tool_parametrized( - command, timeout, json_output, result_exit_code, result_stdout, - expected_strings, test_assertions, test_factory + command, + timeout, + json_output, + result_exit_code, + result_stdout, + expected_strings, + test_assertions, + test_factory, ): """ Test the kubectl tool with different inputs. - + Args: command: kubectl command timeout: Command timeout @@ -194,9 +184,9 @@ async def test_kubectl_tool_parametrized( stdout=result_stdout, stderr="", is_json=json_output and result_exit_code == 0, # Only JSON for success cases - duration_ms=100 + duration_ms=100, ) - + # Set up the mocks with patch("mcp_server_troubleshoot.server.get_bundle_manager") as mock_get_manager: mock_manager = Mock() @@ -204,36 +194,40 @@ async def test_kubectl_tool_parametrized( # Add diagnostic info mock to avoid diagnostics error mock_manager.get_diagnostic_info = AsyncMock(return_value={"api_server_available": True}) mock_get_manager.return_value = mock_manager - + with patch("mcp_server_troubleshoot.server.get_kubectl_executor") as mock_get_executor: mock_executor = Mock() - + # For error cases, raise an exception if result_exit_code != 0: from mcp_server_troubleshoot.kubectl import KubectlError + mock_executor.execute = AsyncMock( - side_effect=KubectlError(f"kubectl command failed: {command}", result_exit_code, "") + side_effect=KubectlError( + f"kubectl command failed: {command}", result_exit_code, "" + ) ) else: # For success cases, return the mock result mock_executor.execute = AsyncMock(return_value=mock_result) - + mock_get_executor.return_value = mock_executor - + # Create KubectlCommandArgs instance from mcp_server_troubleshoot.kubectl import KubectlCommandArgs + args = KubectlCommandArgs(command=command, timeout=timeout, json_output=json_output) - + # Call the tool function response = await kubectl(args) - + # Verify API check called mock_manager.check_api_server_available.assert_awaited_once() - + # For success cases, verify kubectl execution if result_exit_code == 0: mock_executor.execute.assert_awaited_once_with(command, timeout, json_output) - + # Use the test assertion helper to verify response test_assertions.assert_api_response_valid(response, "text", expected_strings) @@ -247,7 +241,7 @@ async def test_kubectl_tool_parametrized( "list_files", {"path": "dir1", "recursive": False}, FileListResult( - path="dir1", + path="dir1", entries=[ FileInfo( name="file1.txt", @@ -258,12 +252,12 @@ async def test_kubectl_tool_parametrized( modify_time=123456789.0, is_binary=False, ) - ], - recursive=False, - total_files=1, - total_dirs=0 + ], + recursive=False, + total_files=1, + total_dirs=0, ), - ["Listed files", "file1.txt", "total_files", "total_dirs"] + ["Listed files", "file1.txt", "total_files", "total_dirs"], ), # Test 2: read_file ( @@ -277,13 +271,19 @@ async def test_kubectl_tool_parametrized( total_lines=1, binary=False, ), - ["Read text file", "This is the file content"] + ["Read text file", "This is the file content"], ), # Test 3: grep_files ( "grep_files", - {"pattern": "pattern", "path": "dir1", "recursive": True, - "glob_pattern": "*.txt", "case_sensitive": False, "max_results": 100}, + { + "pattern": "pattern", + "path": "dir1", + "recursive": True, + "glob_pattern": "*.txt", + "case_sensitive": False, + "max_results": 100, + }, GrepResult( pattern="pattern", path="dir1", @@ -302,13 +302,19 @@ async def test_kubectl_tool_parametrized( case_sensitive=False, truncated=False, ), - ["Found 1 matches", "This contains pattern", "total_matches"] + ["Found 1 matches", "This contains pattern", "total_matches"], ), # Test 4: grep_files (multiple matches) ( "grep_files", - {"pattern": "common", "path": ".", "recursive": True, - "glob_pattern": "*.txt", "case_sensitive": False, "max_results": 100}, + { + "pattern": "common", + "path": ".", + "recursive": True, + "glob_pattern": "*.txt", + "case_sensitive": False, + "max_results": 100, + }, GrepResult( pattern="common", path=".", @@ -334,7 +340,7 @@ async def test_kubectl_tool_parametrized( case_sensitive=False, truncated=False, ), - ["Found 2 matches", "This has common text", "More common text"] + ["Found 2 matches", "This has common text", "More common text"], ), ], ids=[ @@ -342,14 +348,14 @@ async def test_kubectl_tool_parametrized( "read-file", "grep-files-single-match", "grep-files-multiple-matches", - ] + ], ) async def test_file_operations_parametrized( file_operation, args, result, expected_strings, test_assertions ): """ Test file operation tools with different inputs and expected results. - + Args: file_operation: Operation to test (list_files, read_file, grep_files) args: Arguments for the operation @@ -361,34 +367,41 @@ async def test_file_operations_parametrized( with patch("mcp_server_troubleshoot.server.get_file_explorer") as mock_get_explorer: mock_explorer = Mock() mock_get_explorer.return_value = mock_explorer - + # Set up the mock result based on the operation if file_operation == "list_files": mock_explorer.list_files = AsyncMock(return_value=result) from mcp_server_troubleshoot.files import ListFilesArgs + operation_args = ListFilesArgs(**args) response = await list_files(operation_args) mock_explorer.list_files.assert_awaited_once_with(args["path"], args["recursive"]) - + elif file_operation == "read_file": mock_explorer.read_file = AsyncMock(return_value=result) from mcp_server_troubleshoot.files import ReadFileArgs + operation_args = ReadFileArgs(**args) response = await read_file(operation_args) mock_explorer.read_file.assert_awaited_once_with( args["path"], args["start_line"], args["end_line"] ) - + elif file_operation == "grep_files": mock_explorer.grep_files = AsyncMock(return_value=result) from mcp_server_troubleshoot.files import GrepFilesArgs + operation_args = GrepFilesArgs(**args) response = await grep_files(operation_args) mock_explorer.grep_files.assert_awaited_once_with( - args["pattern"], args["path"], args["recursive"], - args["glob_pattern"], args["case_sensitive"], args["max_results"] + args["pattern"], + args["path"], + args["recursive"], + args["glob_pattern"], + args["case_sensitive"], + args["max_results"], ) - + # Use the test assertion helper to verify response test_assertions.assert_api_response_valid(response, "text", expected_strings) @@ -401,33 +414,33 @@ async def test_file_operations_parametrized( ( FileSystemError, "File not found: test.txt", - ["File system error", "File not found: test.txt"] + ["File system error", "File not found: test.txt"], ), # Path not found errors ( PathNotFoundError, "Path /nonexistent does not exist", - ["File system error", "Path /nonexistent does not exist"] + ["File system error", "Path /nonexistent does not exist"], ), # Bundle manager errors ( BundleManagerError, "No active bundle initialized", - ["Bundle error", "No active bundle initialized"] + ["Bundle error", "No active bundle initialized"], ), ], ids=[ "filesystem-error", "path-not-found", "bundle-manager-error", - ] + ], ) async def test_file_operations_error_handling( error_type, error_message, expected_strings, test_assertions ): """ Test that file operation tools properly handle various error types. - + Args: error_type: Type of error to simulate error_message: Error message to include @@ -441,20 +454,20 @@ async def test_file_operations_error_handling( mock_explorer.read_file = AsyncMock(side_effect=error_type(error_message)) mock_explorer.grep_files = AsyncMock(side_effect=error_type(error_message)) mock_get_explorer.return_value = mock_explorer - + # Test all three file operations with the same error from mcp_server_troubleshoot.files import ListFilesArgs, ReadFileArgs, GrepFilesArgs - + # 1. Test list_files list_args = ListFilesArgs(path="test/path") list_response = await list_files(list_args) test_assertions.assert_api_response_valid(list_response, "text", expected_strings) - + # 2. Test read_file read_args = ReadFileArgs(path="test/file.txt") read_response = await read_file(read_args) test_assertions.assert_api_response_valid(read_response, "text", expected_strings) - + # 3. Test grep_files grep_args = GrepFilesArgs(pattern="test", path="test/path") grep_response = await grep_files(grep_args) @@ -466,36 +479,24 @@ async def test_file_operations_error_handling( "include_invalid,bundles_available,expected_strings", [ # With bundles available - ( - False, - True, - ["support-bundle-1.tar.gz", "Usage Instructions", "initialize_bundle"] - ), + (False, True, ["support-bundle-1.tar.gz", "Usage Instructions", "initialize_bundle"]), # No bundles available - ( - False, - False, - ["No support bundles found", "download or transfer a bundle"] - ), + (False, False, ["No support bundles found", "download or transfer a bundle"]), # With invalid bundles included - ( - True, - True, - ["support-bundle-1.tar.gz", "validation_message", "initialize_bundle"] - ), + (True, True, ["support-bundle-1.tar.gz", "validation_message", "initialize_bundle"]), ], ids=[ "with-bundles", "no-bundles", "with-invalid-bundles", - ] + ], ) async def test_list_available_bundles_parametrized( include_invalid, bundles_available, expected_strings, test_assertions, test_factory ): """ Test the list_available_bundles tool with different scenarios. - + Args: include_invalid: Whether to include invalid bundles bundles_available: Whether any bundles are available @@ -505,7 +506,7 @@ async def test_list_available_bundles_parametrized( """ # Set up a custom class for testing from dataclasses import dataclass - + @dataclass class MockAvailableBundle: name: str @@ -515,12 +516,12 @@ class MockAvailableBundle: modified_time: float valid: bool validation_message: str = None - + # Set up mock for BundleManager with patch("mcp_server_troubleshoot.server.get_bundle_manager") as mock_get_manager: bundle_manager = Mock() mock_get_manager.return_value = bundle_manager - + # Create test bundles if bundles_available: bundles = [ @@ -533,7 +534,7 @@ class MockAvailableBundle: valid=True, ), ] - + # Add an invalid bundle if include_invalid is True if include_invalid: bundles.append( @@ -549,20 +550,21 @@ class MockAvailableBundle: ) else: bundles = [] - + # Set up the mock return value bundle_manager.list_available_bundles = AsyncMock(return_value=bundles) - + # Create ListAvailableBundlesArgs instance from mcp_server_troubleshoot.bundle import ListAvailableBundlesArgs + args = ListAvailableBundlesArgs(include_invalid=include_invalid) - + # Call the tool function response = await list_available_bundles(args) - + # Verify method call bundle_manager.list_available_bundles.assert_awaited_once_with(include_invalid) - + # Use the test assertion helper to verify response test_assertions.assert_api_response_valid(response, "text", expected_strings) @@ -571,79 +573,82 @@ class MockAvailableBundle: async def test_cleanup_resources(test_assertions): """ Test that the cleanup_resources function properly cleans up bundle manager resources. - + This test verifies: 1. The global shutdown flag is set 2. The bundle manager cleanup method is called 3. Multiple cleanup calls are handled correctly - + Args: test_assertions: Assertions helper fixture """ # Mock both app_context and legacy bundle manager - with patch("mcp_server_troubleshoot.server.get_app_context") as mock_get_context, \ - patch("mcp_server_troubleshoot.server.globals") as mock_globals: - + with ( + patch("mcp_server_troubleshoot.server.get_app_context") as mock_get_context, + patch("mcp_server_troubleshoot.server.globals") as mock_globals, + ): + # Reset shutdown flag import mcp_server_troubleshoot.server + mcp_server_troubleshoot.server._is_shutting_down = False - + # Setup app context mode mock_app_context = AsyncMock() mock_app_context.bundle_manager = AsyncMock() mock_app_context.bundle_manager.cleanup = AsyncMock() - + # Set return value for get_app_context mock_get_context.return_value = mock_app_context - + # Mock globals for legacy mode mock_globals.return_value = { "_bundle_manager": None # Not used in this test since we test app_context mode } - + # Call cleanup_resources await cleanup_resources() - + # Verify cleanup was called on app context bundle manager mock_app_context.bundle_manager.cleanup.assert_awaited_once() - + # Verify shutdown flag was set assert mcp_server_troubleshoot.server._is_shutting_down is True - + # Reset mock mock_app_context.bundle_manager.cleanup.reset_mock() - + # Call cleanup_resources again (should not call cleanup again) await cleanup_resources() - + # Verify cleanup was not called again mock_app_context.bundle_manager.cleanup.assert_not_awaited() - + # Now test legacy mode - with patch("mcp_server_troubleshoot.server.get_app_context") as mock_get_context, \ - patch("mcp_server_troubleshoot.server.globals") as mock_globals: - + with ( + patch("mcp_server_troubleshoot.server.get_app_context") as mock_get_context, + patch("mcp_server_troubleshoot.server.globals") as mock_globals, + ): + # Reset shutdown flag mcp_server_troubleshoot.server._is_shutting_down = False - + # Setup legacy mode (no app context) mock_get_context.return_value = None - + # Setup legacy bundle manager mock_bundle_manager = AsyncMock() mock_bundle_manager.cleanup = AsyncMock() - + # Mock globals for legacy mode - mock_globals.return_value = { - "_bundle_manager": mock_bundle_manager - } - + mock_globals.return_value = {"_bundle_manager": mock_bundle_manager} + # Call cleanup_resources await cleanup_resources() - + # Verify cleanup was called on legacy bundle manager mock_bundle_manager.cleanup.assert_awaited_once() - + # Verify shutdown flag was set assert mcp_server_troubleshoot.server._is_shutting_down is True @@ -652,7 +657,7 @@ async def test_cleanup_resources(test_assertions): async def test_register_signal_handlers(): """ Test that the register_signal_handlers function properly sets up handlers for signals. - + This test verifies: 1. Signal handlers are registered for SIGINT and SIGTERM 2. The event loop's add_signal_handler method is called @@ -663,12 +668,13 @@ async def test_register_signal_handlers(): mock_get_loop.return_value = mock_loop mock_loop.is_closed.return_value = False mock_loop.add_signal_handler = Mock() - + # Call register_signal_handlers register_signal_handlers() - + # Verify add_signal_handler was called for each signal import signal + if hasattr(signal, "SIGTERM"): # Check for POSIX signals assert mock_loop.add_signal_handler.call_count >= 1 else: # Windows @@ -679,45 +685,49 @@ async def test_register_signal_handlers(): async def test_shutdown_function(): """ Test that the shutdown function properly triggers cleanup process. - + This test verifies: 1. In an async context, cleanup_resources is called as a task 2. In a non-async context, a new event loop is created 3. Cleanup is properly called in both cases """ # Test case 1: With running loop (async context) - with patch("asyncio.get_running_loop") as mock_get_loop, \ - patch("asyncio.create_task") as mock_create_task, \ - patch("mcp_server_troubleshoot.server.cleanup_resources"): - + with ( + patch("asyncio.get_running_loop") as mock_get_loop, + patch("asyncio.create_task") as mock_create_task, + patch("mcp_server_troubleshoot.server.cleanup_resources"), + ): + mock_loop = Mock() mock_get_loop.return_value = mock_loop mock_loop.is_closed.return_value = False - + # Call shutdown shutdown() - + # Verify create_task was called mock_create_task.assert_called_once() - + # Test case 2: Without running loop (non-async context) - with patch("asyncio.get_running_loop", side_effect=RuntimeError("No running loop")), \ - patch("asyncio.new_event_loop") as mock_new_loop, \ - patch("asyncio.set_event_loop") as mock_set_loop, \ - patch("mcp_server_troubleshoot.server.cleanup_resources"): - + with ( + patch("asyncio.get_running_loop", side_effect=RuntimeError("No running loop")), + patch("asyncio.new_event_loop") as mock_new_loop, + patch("asyncio.set_event_loop") as mock_set_loop, + patch("mcp_server_troubleshoot.server.cleanup_resources"), + ): + mock_loop = Mock() mock_new_loop.return_value = mock_loop - + # Call shutdown shutdown() - + # Verify new_event_loop and set_event_loop were called mock_new_loop.assert_called_once() mock_set_loop.assert_called_once_with(mock_loop) - + # Verify run_until_complete was called mock_loop.run_until_complete.assert_called_once() - + # Verify loop was closed - mock_loop.close.assert_called_once() \ No newline at end of file + mock_loop.close.assert_called_once()