diff --git a/packages/x86_64/APKINDEX.tar.gz b/packages/x86_64/APKINDEX.tar.gz index b3c12ff..cfed3bc 100644 Binary files a/packages/x86_64/APKINDEX.tar.gz and b/packages/x86_64/APKINDEX.tar.gz differ diff --git a/packages/x86_64/troubleshoot-mcp-server-0.1.0-r0.apk b/packages/x86_64/troubleshoot-mcp-server-0.1.0-r0.apk index 6e89965..c8c87e0 100644 Binary files a/packages/x86_64/troubleshoot-mcp-server-0.1.0-r0.apk and b/packages/x86_64/troubleshoot-mcp-server-0.1.0-r0.apk differ diff --git a/src/mcp_server_troubleshoot/server.py b/src/mcp_server_troubleshoot/server.py index 7af7398..7c9edf5 100644 --- a/src/mcp_server_troubleshoot/server.py +++ b/src/mcp_server_troubleshoot/server.py @@ -15,16 +15,11 @@ from .bundle import ( BundleManager, BundleManagerError, - InitializeBundleArgs, - ListAvailableBundlesArgs, ) -from .kubectl import KubectlError, KubectlExecutor, KubectlCommandArgs +from .kubectl import KubectlError, KubectlExecutor from .files import ( FileExplorer, FileSystemError, - GrepFilesArgs, - ListFilesArgs, - ReadFileArgs, ) from .lifecycle import app_lifespan, AppContext from .formatters import get_formatter, ResponseFormatter @@ -144,25 +139,26 @@ def check_response_size( @mcp.tool() -async def initialize_bundle(args: InitializeBundleArgs) -> List[TextContent]: +async def initialize_bundle( + source: str, force: bool = False, verbosity: Optional[str] = None +) -> List[TextContent]: """ Initialize a Kubernetes support bundle for analysis. This tool loads a bundle and makes it available for exploration with other tools. Args: - args: Arguments containing: - source: (string, required) The source of the bundle (URL or local file path) - force: (boolean, optional) Whether to force re-initialization if a bundle - is already active. Defaults to False. - verbosity: (string, optional) Verbosity level for response formatting - (minimal|standard|verbose|debug). Defaults to minimal. + source: (string, required) The source of the bundle (URL or local file path) + force: (boolean, optional) Whether to force re-initialization if a bundle + is already active. Defaults to False. + verbosity: (string, optional) Verbosity level for response formatting + (minimal|standard|verbose|debug). Defaults to minimal. Returns: Metadata about the initialized bundle including path and kubeconfig location. If the API server is not available, also returns diagnostic information. """ bundle_manager = get_bundle_manager() - formatter = get_formatter(args.verbosity) + formatter = get_formatter(verbosity) try: # Check if sbctl is available before attempting to initialize @@ -174,7 +170,7 @@ async def initialize_bundle(args: InitializeBundleArgs) -> List[TextContent]: return [TextContent(type="text", text=formatted_error)] # Initialize the bundle - result = await bundle_manager.initialize_bundle(args.source, args.force) + result = await bundle_manager.initialize_bundle(source, force) # Check if the API server is available api_server_available = await bundle_manager.check_api_server_available() @@ -215,28 +211,29 @@ async def initialize_bundle(args: InitializeBundleArgs) -> List[TextContent]: @mcp.tool() -async def list_available_bundles(args: ListAvailableBundlesArgs) -> List[TextContent]: +async def list_available_bundles( + include_invalid: bool = False, verbosity: Optional[str] = None +) -> List[TextContent]: """ Scan the bundle storage directory to find available compressed bundle files and list them. This tool helps discover which bundles are available for initialization. Args: - args: Arguments containing: - include_invalid: (boolean, optional) Whether to include invalid or inaccessible - bundles in the results. Defaults to False. - verbosity: (string, optional) Verbosity level for response formatting - (minimal|standard|verbose|debug). Defaults to minimal. + include_invalid: (boolean, optional) Whether to include invalid or inaccessible + bundles in the results. Defaults to False. + verbosity: (string, optional) Verbosity level for response formatting + (minimal|standard|verbose|debug). Defaults to minimal. Returns: A list of available bundle files with details including path, size, and modification time. Bundles are validated to ensure they have the expected support bundle structure. """ bundle_manager = get_bundle_manager() - formatter = get_formatter(args.verbosity) + formatter = get_formatter(verbosity) try: # List available bundles - bundles = await bundle_manager.list_available_bundles(args.include_invalid) + bundles = await bundle_manager.list_available_bundles(include_invalid) response = formatter.format_bundle_list(bundles) return check_response_size(response, "list_bundles", formatter) @@ -254,20 +251,21 @@ async def list_available_bundles(args: ListAvailableBundlesArgs) -> List[TextCon @mcp.tool() -async def kubectl(args: KubectlCommandArgs) -> List[TextContent]: +async def kubectl( + command: str, timeout: int = 30, json_output: bool = False, verbosity: Optional[str] = None +) -> List[TextContent]: """ Execute kubectl commands against the initialized bundle's API server. Allows running Kubernetes CLI commands to explore resources in the support bundle. Args: - args: Arguments containing: - command: (string, required) The kubectl command to execute (e.g., "get pods", - "get nodes -o wide", "describe deployment nginx") - timeout: (integer, optional) Timeout in seconds for the command. Defaults to 30. - json_output: (boolean, optional) Whether to format the output as JSON. - Defaults to True. Set to False for plain text output. - verbosity: (string, optional) Verbosity level for response formatting - (minimal|standard|verbose|debug). Defaults to minimal. + command: (string, required) The kubectl command to execute (e.g., "get pods", + "get nodes -o wide", "describe deployment nginx") + timeout: (integer, optional) Timeout in seconds for the command. Defaults to 30. + json_output: (boolean, optional) Whether to format the output as JSON. + Defaults to False. Set to True for JSON output. + verbosity: (string, optional) Verbosity level for response formatting + (minimal|standard|verbose|debug). Defaults to minimal. Returns: The formatted output from the kubectl command, along with execution metadata @@ -275,7 +273,7 @@ async def kubectl(args: KubectlCommandArgs) -> List[TextContent]: information if the command fails or API server is not available. """ bundle_manager = get_bundle_manager() - formatter = get_formatter(args.verbosity) + formatter = get_formatter(verbosity) try: # Check if a bundle is initialized first @@ -314,7 +312,7 @@ async def kubectl(args: KubectlCommandArgs) -> List[TextContent]: return check_response_size(formatted_error, "kubectl", formatter) # Execute the kubectl command - result = await get_kubectl_executor().execute(args.command, args.timeout, args.json_output) + result = await get_kubectl_executor().execute(command, timeout, json_output) # Format response using the formatter response = formatter.format_kubectl_result(result) @@ -370,7 +368,9 @@ async def kubectl(args: KubectlCommandArgs) -> List[TextContent]: @mcp.tool() -async def list_files(args: ListFilesArgs) -> List[TextContent]: +async def list_files( + path: str, recursive: bool = False, verbosity: Optional[str] = None +) -> List[TextContent]: """ List files and directories within the support bundle. This tool lets you explore the directory structure of the initialized bundle. @@ -379,23 +379,22 @@ async def list_files(args: ListFilesArgs) -> List[TextContent]: If no bundle is initialized, use the `list_available_bundles` tool to find available bundles. Args: - args: Arguments containing: - path: (string, required) The path within the bundle to list. Use "" or "/" - for root directory. Path cannot contain directory traversal (e.g., "../"). - recursive: (boolean, optional) Whether to list files and directories recursively. - Defaults to False. Set to True to show nested files. - verbosity: (string, optional) Verbosity level for response formatting - (minimal|standard|verbose|debug). Defaults to minimal. + path: (string, required) The path within the bundle to list. Use "" or "/" + for root directory. Path cannot contain directory traversal (e.g., "../"). + recursive: (boolean, optional) Whether to list files and directories recursively. + Defaults to False. Set to True to show nested files. + verbosity: (string, optional) Verbosity level for response formatting + (minimal|standard|verbose|debug). Defaults to minimal. Returns: A JSON list of entries with file/directory information including name, path, type (file or dir), size, access time, modification time, and whether binary. Also returns metadata about the directory listing like total file and directory counts. """ - formatter = get_formatter(args.verbosity) + formatter = get_formatter(verbosity) try: - result = await get_file_explorer().list_files(args.path, args.recursive) + result = await get_file_explorer().list_files(path, recursive) response = formatter.format_file_list(result) return check_response_size(response, "list_files", formatter) @@ -417,7 +416,9 @@ async def list_files(args: ListFilesArgs) -> List[TextContent]: @mcp.tool() -async def read_file(args: ReadFileArgs) -> List[TextContent]: +async def read_file( + path: str, start_line: int = 0, end_line: Optional[int] = None, verbosity: Optional[str] = None +) -> List[TextContent]: """ Read a file within the support bundle with optional line range filtering. Displays file content with line numbers. @@ -426,24 +427,23 @@ async def read_file(args: ReadFileArgs) -> List[TextContent]: If no bundle is initialized, use the `list_available_bundles` tool to find available bundles. Args: - args: Arguments containing: - path: (string, required) The path to the file within the bundle to read. - Path cannot contain directory traversal (e.g., "../"). - start_line: (integer, optional) The line number to start reading from (0-indexed). - Defaults to 0 (the first line). - end_line: (integer or null, optional) The line number to end reading at - (0-indexed, inclusive). Defaults to null, which means read to the end of the file. - verbosity: (string, optional) Verbosity level for response formatting - (minimal|standard|verbose|debug). Defaults to minimal. + path: (string, required) The path to the file within the bundle to read. + Path cannot contain directory traversal (e.g., "../"). + start_line: (integer, optional) The line number to start reading from (0-indexed). + Defaults to 0 (the first line). + end_line: (integer or null, optional) The line number to end reading at + (0-indexed, inclusive). Defaults to null, which means read to the end of the file. + verbosity: (string, optional) Verbosity level for response formatting + (minimal|standard|verbose|debug). Defaults to minimal. Returns: The content of the file with line numbers. For text files, displays the specified line range with line numbers. For binary files, displays a hex dump. """ - formatter = get_formatter(args.verbosity) + formatter = get_formatter(verbosity) try: - result = await get_file_explorer().read_file(args.path, args.start_line, args.end_line) + result = await get_file_explorer().read_file(path, start_line, end_line) response = formatter.format_file_content(result) return check_response_size(response, "read_file", formatter) @@ -465,7 +465,17 @@ async def read_file(args: ReadFileArgs) -> List[TextContent]: @mcp.tool() -async def grep_files(args: GrepFilesArgs) -> List[TextContent]: +async def grep_files( + pattern: str, + path: str, + recursive: bool = True, + glob_pattern: Optional[str] = None, + case_sensitive: bool = False, + max_results: int = 1000, + max_results_per_file: int = 5, + max_files: int = 10, + verbosity: Optional[str] = None, +) -> List[TextContent]: """ Search for patterns in files within the support bundle. Searches both file content and filenames, making it useful for finding keywords, error messages, or identifying files. @@ -474,36 +484,41 @@ async def grep_files(args: GrepFilesArgs) -> List[TextContent]: If no bundle is initialized, use the `list_available_bundles` tool to find available bundles. Args: - args: Arguments containing: - pattern: (string, required) The pattern to search for. Supports regex syntax. - path: (string, required) The path within the bundle to search. Use "" or "/" - to search from root. Path cannot contain directory traversal (e.g., "../"). - recursive: (boolean, optional) Whether to search recursively in subdirectories. - Defaults to True. - glob_pattern: (string or null, optional) File pattern to filter which files - to search (e.g., "*.yaml", "*.{json,log}"). Defaults to null (search all files). - case_sensitive: (boolean, optional) Whether the search is case-sensitive. - Defaults to False (case-insensitive search). - max_results: (integer, optional) Maximum number of results to return. - Defaults to 1000. - verbosity: (string, optional) Verbosity level for response formatting - (minimal|standard|verbose|debug). Defaults to minimal. + pattern: (string, required) The pattern to search for. Supports regex syntax. + path: (string, required) The path within the bundle to search. Use "" or "/" + to search from root. Path cannot contain directory traversal (e.g., "../"). + recursive: (boolean, optional) Whether to search recursively in subdirectories. + Defaults to True. + glob_pattern: (string or null, optional) File pattern to filter which files + to search (e.g., "*.yaml", "*.{json,log}"). Defaults to null (search all files). + case_sensitive: (boolean, optional) Whether the search is case-sensitive. + Defaults to False (case-insensitive search). + max_results: (integer, optional) Maximum number of results to return. + Defaults to 1000. + max_results_per_file: (integer, optional) Maximum number of results to return per file. + Defaults to 5. + max_files: (integer, optional) Maximum number of files to search/return. + Defaults to 10. + verbosity: (string, optional) Verbosity level for response formatting + (minimal|standard|verbose|debug). Defaults to minimal. Returns: Matches found in file contents and filenames, grouped by file. Also includes search metadata such as the number of files searched and the total number of matches found. """ - formatter = get_formatter(args.verbosity) + formatter = get_formatter(verbosity) try: result = await get_file_explorer().grep_files( - args.pattern, - args.path, - args.recursive, - args.glob_pattern, - args.case_sensitive, - args.max_results, + pattern, + path, + recursive, + glob_pattern, + case_sensitive, + max_results, + max_results_per_file, + max_files, ) response = formatter.format_grep_results(result) diff --git a/tasks/backlog/standardize-mcp-schemas.md b/tasks/active/standardize-mcp-schemas.md similarity index 99% rename from tasks/backlog/standardize-mcp-schemas.md rename to tasks/active/standardize-mcp-schemas.md index d95cd0f..342485f 100644 --- a/tasks/backlog/standardize-mcp-schemas.md +++ b/tasks/active/standardize-mcp-schemas.md @@ -1,9 +1,10 @@ # Task: Standardize MCP Argument Schemas -**Status**: backlog +**Status**: active **Priority**: high -**Assignee**: TBD +**Assignee**: Claude **Created**: 2025-07-29 +**Started**: 2025-07-29 **Tags**: mcp, schema, compatibility, fastmcp ## Problem Statement diff --git a/tests/e2e/test_direct_tool_integration.py b/tests/e2e/test_direct_tool_integration.py index 09ec909..aaeeee3 100644 --- a/tests/e2e/test_direct_tool_integration.py +++ b/tests/e2e/test_direct_tool_integration.py @@ -20,16 +20,6 @@ grep_files, kubectl, ) -from src.mcp_server_troubleshoot.bundle import ( - InitializeBundleArgs, - ListAvailableBundlesArgs, -) -from src.mcp_server_troubleshoot.files import ( - ListFilesArgs, - ReadFileArgs, - GrepFilesArgs, -) -from src.mcp_server_troubleshoot.kubectl import KubectlCommandArgs from tests.integration.mcp_test_utils import get_test_bundle_path @@ -81,10 +71,10 @@ async def test_initialize_bundle_tool_direct(self, test_bundle_copy): This is the core test that verifies bundle loading works correctly. """ - args = InitializeBundleArgs(source=str(test_bundle_copy)) - # This should complete in ~6 seconds based on our direct tests - content = await asyncio.wait_for(initialize_bundle(args), timeout=15.0) + content = await asyncio.wait_for( + initialize_bundle(source=str(test_bundle_copy)), timeout=15.0 + ) # Verify successful bundle loading assert len(content) > 0, "initialize_bundle should return content" @@ -114,8 +104,7 @@ async def test_initialize_bundle_tool_direct(self, test_bundle_copy): async def test_list_available_bundles_tool_direct(self, test_bundle_copy): """Test listing available bundles.""" # List available bundles (should include the bundle file we copied) - list_args = ListAvailableBundlesArgs() - content = await list_available_bundles(list_args) + content = await list_available_bundles() assert len(content) > 0, "Should have bundle list content" @@ -139,12 +128,10 @@ async def test_list_available_bundles_tool_direct(self, test_bundle_copy): async def test_file_operations_direct(self, test_bundle_copy): """Test file operations (list_files, read_file) via direct calls.""" # Initialize bundle first - init_args = InitializeBundleArgs(source=str(test_bundle_copy)) - await initialize_bundle(init_args) + await initialize_bundle(source=str(test_bundle_copy)) # Test list_files - list_args = ListFilesArgs(path="/", recursive=False) - list_content = await list_files(list_args) + list_content = await list_files(path="/", recursive=False) assert len(list_content) > 0, "Should have file listing content" files_text = list_content[0].text @@ -165,8 +152,7 @@ async def test_file_operations_direct(self, test_bundle_copy): if first_file.get("type") == "file": file_path = first_file.get("path", first_file.get("name", "")) if file_path: - read_args = ReadFileArgs(path=file_path) - read_content = await read_file(read_args) + read_content = await read_file(path=file_path) assert len(read_content) > 0, f"Should be able to read file {file_path}" except json.JSONDecodeError: # If not JSON, just verify we got some text output @@ -178,19 +164,16 @@ async def test_file_operations_direct(self, test_bundle_copy): async def test_grep_functionality_direct(self, test_bundle_copy): """Test grep functionality via direct calls.""" # Initialize bundle first - init_args = InitializeBundleArgs(source=str(test_bundle_copy)) - await initialize_bundle(init_args) + await initialize_bundle(source=str(test_bundle_copy)) # Test grep for common patterns - grep_args = GrepFilesArgs( + grep_content = await grep_files( pattern="kube", # Look for kubernetes-related content path="/", recursive=True, case_sensitive=False, max_results=100, ) - - grep_content = await grep_files(grep_args) assert len(grep_content) > 0, "Should have grep results content" # The grep might not find anything, but should return valid response @@ -201,14 +184,13 @@ async def test_grep_functionality_direct(self, test_bundle_copy): async def test_kubectl_tool_direct(self, test_bundle_copy): """Test kubectl tool via direct calls.""" # Initialize bundle first - init_args = InitializeBundleArgs(source=str(test_bundle_copy)) - await initialize_bundle(init_args) + await initialize_bundle(source=str(test_bundle_copy)) # Test kubectl version command (should work even with limited cluster) - kubectl_args = KubectlCommandArgs(command="version --client", timeout=10, json_output=False) - try: - kubectl_content = await asyncio.wait_for(kubectl(kubectl_args), timeout=15.0) + kubectl_content = await asyncio.wait_for( + kubectl(command="version --client", timeout=10, json_output=False), timeout=15.0 + ) assert len(kubectl_content) > 0, "Should have kubectl output" kubectl_text = kubectl_content[0].text @@ -228,22 +210,19 @@ async def test_kubectl_tool_direct(self, test_bundle_copy): async def test_complete_workflow_direct(self, test_bundle_copy): """Test complete bundle analysis workflow via direct tool calls.""" # Step 1: Initialize bundle - init_args = InitializeBundleArgs(source=str(test_bundle_copy)) - init_content = await initialize_bundle(init_args) + init_content = await initialize_bundle(source=str(test_bundle_copy)) assert len(init_content) > 0, "Bundle initialization should return content" print(f"Bundle initialized: {init_content[0].text[:100]}...") # Step 2: List available bundles - list_args = ListAvailableBundlesArgs() - list_content = await list_available_bundles(list_args) + list_content = await list_available_bundles() assert len(list_content) > 0, "Should have bundle list" print(f"Available bundles: {len(list_content)} entries") # Step 3: List files in bundle - files_args = ListFilesArgs(path="/", recursive=True) - files_content = await list_files(files_args) + files_content = await list_files(path="/", recursive=True) assert len(files_content) > 0, "Should have file listing" print(f"File listing obtained: {len(files_content[0].text)} chars") diff --git a/tests/integration/test_tool_functions.py b/tests/integration/test_tool_functions.py index bfa82ff..dfe3c5c 100644 --- a/tests/integration/test_tool_functions.py +++ b/tests/integration/test_tool_functions.py @@ -31,12 +31,6 @@ kubectl, initialize_with_bundle_dir, ) -from mcp_server_troubleshoot.bundle import ( - InitializeBundleArgs, - ListAvailableBundlesArgs, -) -from mcp_server_troubleshoot.files import ListFilesArgs, ReadFileArgs, GrepFilesArgs -from mcp_server_troubleshoot.kubectl import KubectlCommandArgs from .mcp_test_utils import get_test_bundle_path @@ -75,8 +69,7 @@ async def test_list_available_bundles_function(bundle_storage_dir): NOTE: This is a direct function call test, not MCP protocol testing. """ # Test with empty bundle directory first - args = ListAvailableBundlesArgs(include_invalid=False) - result = await list_available_bundles(args) + result = await list_available_bundles(include_invalid=False) # Verify result structure assert isinstance(result, list), "Result should be a list" @@ -104,8 +97,7 @@ async def test_initialize_bundle_function_local_file(bundle_storage_dir): test_bundle = get_test_bundle_path() # Call initialize_bundle function directly (not via MCP protocol) - args = InitializeBundleArgs(source=str(test_bundle), force=False) - result = await initialize_bundle(args) + result = await initialize_bundle(source=str(test_bundle), force=False) # Verify we got a result assert isinstance(result, list), "Tool result should be a list" @@ -143,16 +135,14 @@ async def test_initialize_bundle_function_force_flag(bundle_storage_dir): test_bundle = get_test_bundle_path() # First initialization - args1 = InitializeBundleArgs(source=str(test_bundle), force=False) - result1 = await initialize_bundle(args1) + result1 = await initialize_bundle(source=str(test_bundle), force=False) assert len(result1) > 0, "First initialization should succeed" response1_text = result1[0].text assert "Bundle initialized" in response1_text, "First initialization should report success" # Second initialization with force=True should also work - args2 = InitializeBundleArgs(source=str(test_bundle), force=True) - result2 = await initialize_bundle(args2) + result2 = await initialize_bundle(source=str(test_bundle), force=True) assert len(result2) > 0, "Second initialization with force should succeed" response2_text = result2[0].text @@ -171,18 +161,21 @@ async def test_initialize_bundle_validation_nonexistent_file(bundle_storage_dir) NOTE: This tests Pydantic validation, not MCP functionality. This could be moved to unit tests as it's testing the framework. """ - from pydantic_core import ValidationError - - # Try to create InitializeBundleArgs with nonexistent file (tests Pydantic validation) + # Try to initialize bundle with nonexistent file nonexistent_path = "/tmp/definitely-does-not-exist.tar.gz" - # Should raise ValidationError due to file not existing - with pytest.raises(ValidationError) as exc_info: - InitializeBundleArgs(source=nonexistent_path, force=False) - - # Verify the error message indicates the file wasn't found - error_msg = str(exc_info.value) - assert "Bundle source not found" in error_msg, "Should indicate bundle source not found" + # Should handle the error gracefully and return error response + try: + result = await initialize_bundle(source=nonexistent_path, force=False) + # If it doesn't raise an exception, check for error in response + assert len(result) > 0, "Should return error response" + error_text = result[0].text + assert "not found" in error_text.lower() or "error" in error_text.lower(), ( + "Should indicate error" + ) + except Exception as e: + # Exception is also acceptable for invalid input + assert "not found" in str(e).lower() or "does not exist" in str(e).lower() @pytest.mark.asyncio @@ -200,14 +193,12 @@ async def test_list_files_function_with_bundle(bundle_storage_dir): test_bundle = get_test_bundle_path() # Initialize bundle first by calling function directly - init_args = InitializeBundleArgs(source=str(test_bundle), force=True) - init_result = await initialize_bundle(init_args) + init_result = await initialize_bundle(source=str(test_bundle), force=True) assert len(init_result) > 0, "Bundle initialization should succeed" # Try to list files from root - list_args = ListFilesArgs(path="/", recursive=False) - list_result = await list_files(list_args) + list_result = await list_files(path="/", recursive=False) assert len(list_result) > 0, "List files should return results" @@ -232,17 +223,18 @@ async def test_pydantic_validation_invalid_parameters(bundle_storage_dir): NOTE: This tests Pydantic validation, not our business logic. Could be moved to unit tests or removed as framework testing. """ - from pydantic_core import ValidationError - # Test with invalid path containing directory traversal - with pytest.raises(ValidationError) as exc_info: - ListFilesArgs(path="../invalid", recursive=False) - - # Verify the error indicates path validation failure - error_msg = str(exc_info.value) - assert "Path cannot contain directory traversal" in error_msg, ( - "Should indicate path validation error" - ) + try: + result = await list_files(path="../invalid", recursive=False) + # If it doesn't raise an exception, check for error in response + assert len(result) > 0, "Should return error response" + error_text = result[0].text + assert "traversal" in error_text.lower() or "invalid" in error_text.lower(), ( + "Should indicate path error" + ) + except Exception as e: + # Exception is also acceptable for invalid input + assert "traversal" in str(e).lower() or "invalid" in str(e).lower() @pytest.mark.asyncio @@ -262,14 +254,12 @@ async def test_kubectl_function_execution(bundle_storage_dir): test_bundle = get_test_bundle_path() # Initialize bundle first by calling function directly - init_args = InitializeBundleArgs(source=str(test_bundle), force=True) - init_result = await initialize_bundle(init_args) + init_result = await initialize_bundle(source=str(test_bundle), force=True) assert len(init_result) > 0, "Bundle initialization should succeed" # Try a simple kubectl command via direct function call - kubectl_args = KubectlCommandArgs(command="get pods", timeout=10, json_output=True) - kubectl_result = await kubectl(kubectl_args) + kubectl_result = await kubectl(command="get pods", timeout=10, json_output=True) assert len(kubectl_result) > 0, "kubectl should return results" @@ -304,16 +294,13 @@ async def test_read_file_function_execution(bundle_storage_dir): test_bundle = get_test_bundle_path() # Initialize bundle first by calling function directly - init_args = InitializeBundleArgs(source=str(test_bundle), force=True) - init_result = await initialize_bundle(init_args) + init_result = await initialize_bundle(source=str(test_bundle), force=True) assert len(init_result) > 0, "Bundle initialization should succeed" # Try to read a common file via direct function call - read_args = ReadFileArgs(path="cluster-info/version.json", start_line=0, num_lines=10) - try: - read_result = await read_file(read_args) + read_result = await read_file(path="cluster-info/version.json", start_line=0, end_line=10) assert len(read_result) > 0, "read_file should return results" @@ -351,22 +338,15 @@ async def test_grep_files_function_execution(bundle_storage_dir): test_bundle = get_test_bundle_path() # Initialize bundle first by calling function directly - init_args = InitializeBundleArgs(source=str(test_bundle), force=True) - init_result = await initialize_bundle(init_args) + init_result = await initialize_bundle(source=str(test_bundle), force=True) assert len(init_result) > 0, "Bundle initialization should succeed" # Search for common pattern via direct function call - grep_args = GrepFilesArgs( - pattern="version", - path="/", - file_pattern="*.json", - case_sensitive=False, - recursive=True, + grep_result = await grep_files( + pattern="version", path="/", recursive=True, glob_pattern="*.json", case_sensitive=False ) - grep_result = await grep_files(grep_args) - assert len(grep_result) > 0, "grep_files should return results" # Verify the response structure @@ -398,8 +378,7 @@ async def test_file_operation_error_handling(bundle_storage_dir): # Try file operations without initializing bundle first # Test read_file without bundle - read_args = ReadFileArgs(path="nonexistent.txt", start_line=0, num_lines=10) - read_result = await read_file(read_args) + read_result = await read_file(path="nonexistent.txt", start_line=0, end_line=10) assert len(read_result) > 0, "Should return error response" content_item = read_result[0] diff --git a/tests/unit/test_files.py b/tests/unit/test_files.py index 96968c6..2d64274 100644 --- a/tests/unit/test_files.py +++ b/tests/unit/test_files.py @@ -164,7 +164,7 @@ async def test_file_explorer_list_files(): explorer = FileExplorer(mock_bundle_manager) # Test 1: List root directory non-recursively - result = await explorer.list_files("", False) + result = await explorer.list_files(path="", recursive=False) # Verify behavior expectations assert isinstance(result, FileListResult), "Result should be a FileListResult" @@ -175,7 +175,7 @@ async def test_file_explorer_list_files(): assert result.total_files >= 0, "Should have valid file count" # Test 2: List subdirectory recursively - result = await explorer.list_files("cluster-resources", True) + result = await explorer.list_files(path="cluster-resources", recursive=True) # Verify behavior expectations for recursive listing assert result.path == "cluster-resources", "Path should match requested directory" @@ -222,17 +222,19 @@ async def test_file_explorer_list_files_errors(): # Test 1: Listing a non-existent path raises an error with pytest.raises(PathNotFoundError): - await explorer.list_files("nonexistent_path", False) + await explorer.list_files(path="nonexistent_path", recursive=False) # Test 2: Listing a file (should raise an error) # We know from the real structure that cluster-resources/pods/kube-system.json exists with pytest.raises(Exception): - await explorer.list_files("cluster-resources/pods/kube-system.json", False) + await explorer.list_files( + path="cluster-resources/pods/kube-system.json", recursive=False + ) # Test 3: Without an active bundle should raise an error mock_bundle_manager.get_active_bundle.return_value = None with pytest.raises(Exception): - await explorer.list_files("", False) + await explorer.list_files(path="", recursive=False) @pytest.mark.asyncio @@ -264,7 +266,7 @@ async def test_file_explorer_read_file(): explorer = FileExplorer(mock_bundle_manager) # Test 1: Reading a text file (use real JSON file from bundle structure) - result = await explorer.read_file("cluster-resources/pods/kube-system.json") + result = await explorer.read_file(path="cluster-resources/pods/kube-system.json") # Verify behavior expectations assert isinstance(result, FileContentResult), "Result should be a FileContentResult" @@ -276,7 +278,9 @@ async def test_file_explorer_read_file(): assert result.total_lines > 0, "Line count should be available" # Test 2: Reading a line range from the same file - result = await explorer.read_file("cluster-resources/pods/kube-system.json", 1, 3) + result = await explorer.read_file( + path="cluster-resources/pods/kube-system.json", start_line=1, end_line=3 + ) # Verify behavior expectations for line ranges assert result.start_line == 1, "Start line should match requested value" @@ -284,7 +288,7 @@ async def test_file_explorer_read_file(): assert len(result.content.split("\n")) <= 4, "Should have limited lines based on range" # Test 3: Reading binary file (from the with_binaries structure) - result = await explorer.read_file("binaries/fake_binary") + result = await explorer.read_file(path="binaries/fake_binary") # Verify behavior expectations for binary files assert result.path == "binaries/fake_binary", "Path should be preserved in result" @@ -321,16 +325,16 @@ async def test_file_explorer_read_file_errors(): # Test 1: Reading a non-existent file raises PathNotFoundError with pytest.raises(PathNotFoundError): - await explorer.read_file("nonexistent.txt") + await explorer.read_file(path="nonexistent.txt") # Test 2: Reading a directory raises ReadFileError with pytest.raises(ReadFileError): - await explorer.read_file("cluster-resources") + await explorer.read_file(path="cluster-resources") # Test 3: Without an active bundle should raise an error mock_bundle_manager.get_active_bundle.return_value = None with pytest.raises(Exception): - await explorer.read_file("cluster-resources/pods/kube-system.json") + await explorer.read_file(path="cluster-resources/pods/kube-system.json") @pytest.mark.asyncio @@ -373,7 +377,7 @@ async def test_file_explorer_grep_files(): explorer = FileExplorer(mock_bundle_manager) # Test 1: Global search for common pattern - result = await explorer.grep_files("test", "", True) + result = await explorer.grep_files(pattern="test", path="", recursive=True) # Verify behavior expectations assert isinstance(result, GrepResult), "Result should be a GrepResult" @@ -392,7 +396,9 @@ async def test_file_explorer_grep_files(): assert hasattr(match, "path"), "Match should have path" # Test 2: Directory-restricted search with glob pattern - result = await explorer.grep_files("test", "test_files", True, "*.txt") + result = await explorer.grep_files( + pattern="test", path="test_files", recursive=True, glob_pattern="*.txt" + ) # Verify behavior expectations assert result.pattern == "test", "Pattern should be preserved" @@ -402,10 +408,14 @@ async def test_file_explorer_grep_files(): # Test 3: Case sensitivity behavior # First test with case sensitive search - case_sensitive = await explorer.grep_files("UPPERCASE", "", True, None, True) + case_sensitive = await explorer.grep_files( + pattern="UPPERCASE", path="", recursive=True, glob_pattern=None, case_sensitive=True + ) # Now test with case insensitive search - case_insensitive = await explorer.grep_files("uppercase", "", True, None, False) + case_insensitive = await explorer.grep_files( + pattern="uppercase", path="", recursive=True, glob_pattern=None, case_sensitive=False + ) # Verify behavior expectations for case sensitivity assert case_sensitive.total_matches >= 1, "Should find exact case matches" @@ -457,7 +467,9 @@ async def test_file_explorer_grep_files_with_kubeconfig(): explorer = FileExplorer(mock_bundle_manager) # Test searching for "specific" pattern - result = await explorer.grep_files("specific", "", True, None, False) + result = await explorer.grep_files( + pattern="specific", path="", recursive=True, glob_pattern=None, case_sensitive=False + ) # Verify behavior expectations assert isinstance(result, GrepResult), "Result should be a GrepResult" @@ -502,12 +514,12 @@ async def test_file_explorer_grep_files_errors(): # Test 1: Searching a non-existent path raises an error with pytest.raises(PathNotFoundError): - await explorer.grep_files("test", "nonexistent_path", True) + await explorer.grep_files(pattern="test", path="nonexistent_path", recursive=True) # Test 2: Without an active bundle should raise an error mock_bundle_manager.get_active_bundle.return_value = None with pytest.raises(Exception): - await explorer.grep_files("test", "", True) + await explorer.grep_files(pattern="test", path="", recursive=True) def test_file_explorer_is_binary(): diff --git a/tests/unit/test_files_parametrized.py b/tests/unit/test_files_parametrized.py index ebeb305..8244adc 100644 --- a/tests/unit/test_files_parametrized.py +++ b/tests/unit/test_files_parametrized.py @@ -278,10 +278,10 @@ async def test_file_explorer_list_files_error_handling( if expected_error: # Should raise an error with pytest.raises(expected_error): - await explorer.list_files(actual_path, False) + await explorer.list_files(path=actual_path, recursive=False) else: # Should succeed - result = await explorer.list_files(actual_path, False) + result = await explorer.list_files(path=actual_path, recursive=False) assert isinstance(result, FileListResult) assert result.path == actual_path assert result.total_files >= 0 @@ -334,10 +334,10 @@ async def test_file_explorer_read_file_error_handling( if expected_error: # Should raise an error with pytest.raises(expected_error): - await explorer.read_file(path) + await explorer.read_file(path=path) else: # Should succeed - result = await explorer.read_file(path) + result = await explorer.read_file(path=path) assert isinstance(result, FileContentResult) assert result.path == path assert result.content is not None @@ -389,7 +389,9 @@ async def test_file_explorer_grep_files_behavior( explorer = FileExplorer(bundle_manager) # Run the grep operation - result = await explorer.grep_files(pattern, path, True, None, case_sensitive) + result = await explorer.grep_files( + pattern=pattern, path=path, recursive=True, glob_pattern=None, case_sensitive=case_sensitive + ) # Verify the result structure assert isinstance(result, GrepResult) diff --git a/tests/unit/test_kubectl.py b/tests/unit/test_kubectl.py index c1438f0..85048b5 100644 --- a/tests/unit/test_kubectl.py +++ b/tests/unit/test_kubectl.py @@ -75,7 +75,7 @@ async def test_kubectl_executor_execute_no_bundle(): executor = KubectlExecutor(bundle_manager) with pytest.raises(KubectlError) as excinfo: - await executor.execute("get pods") + await executor.execute(command="get pods") assert "No bundle is initialized" in str(excinfo.value) assert excinfo.value.exit_code == 1 @@ -97,7 +97,7 @@ async def test_kubectl_executor_execute_host_only_bundle(): executor = KubectlExecutor(bundle_manager) with pytest.raises(KubectlError) as excinfo: - await executor.execute("get pods") + await executor.execute(command="get pods") assert "Host-only bundle detected" in str(excinfo.value) assert "no cluster resources" in str(excinfo.value) @@ -140,7 +140,7 @@ async def test_kubectl_executor_execute_success(): executor._run_kubectl_command = AsyncMock(return_value=mock_result) # Execute a command - result = await executor.execute("get pods") + result = await executor.execute(command="get pods") # Verify the result assert result == mock_result @@ -172,7 +172,9 @@ async def test_kubectl_executor_run_kubectl_command(): # Mock create_subprocess_exec with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: # Execute a command - result = await executor._run_kubectl_command("get pods", bundle, 30, True) + result = await executor._run_kubectl_command( + command="get pods", bundle=bundle, timeout=30, json_output=True + ) # Verify the result assert result.command == "get pods -o json" @@ -223,7 +225,9 @@ async def test_kubectl_executor_run_kubectl_command_no_json(): # Mock create_subprocess_exec with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: # Execute a command - result = await executor._run_kubectl_command("get pods", bundle, 30, False) + result = await executor._run_kubectl_command( + command="get pods", bundle=bundle, timeout=30, json_output=False + ) # Verify the result assert result.command == "get pods" @@ -271,7 +275,9 @@ async def test_kubectl_executor_run_kubectl_command_explicit_format(): # Mock create_subprocess_exec with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: # Execute a command with explicit format - result = await executor._run_kubectl_command("get pods -o yaml", bundle, 30, True) + result = await executor._run_kubectl_command( + command="get pods -o yaml", bundle=bundle, timeout=30, json_output=True + ) # Verify the result assert result.command == "get pods -o yaml" @@ -318,7 +324,9 @@ async def test_kubectl_executor_run_kubectl_command_error(): with patch("asyncio.create_subprocess_exec", return_value=mock_process): # Execute a command with pytest.raises(KubectlError) as excinfo: - await executor._run_kubectl_command("get pods", bundle, 30, True) + await executor._run_kubectl_command( + command="get pods", bundle=bundle, timeout=30, json_output=True + ) # Verify the error assert "kubectl command failed" in str(excinfo.value) @@ -365,7 +373,9 @@ async def mock_subprocess_timeout(*args, **kwargs): ): # Execute a command with a short timeout with pytest.raises(KubectlError) as excinfo: - await executor._run_kubectl_command("get pods", bundle, 0.1, True) # 0.1 second timeout + await executor._run_kubectl_command( + command="get pods", bundle=bundle, timeout=0.1, json_output=True + ) # 0.1 second timeout # Verify the error assert "kubectl command timed out" in str(excinfo.value) @@ -432,7 +442,9 @@ async def test_kubectl_default_cli_format(): # Mock create_subprocess_exec with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: # Execute with default json_output=False - result = await executor._run_kubectl_command("get pods", bundle, 30, False) + result = await executor._run_kubectl_command( + command="get pods", bundle=bundle, timeout=30, json_output=False + ) # Verify CLI format is returned assert result.is_json is False @@ -473,7 +485,9 @@ async def test_kubectl_explicit_json_request(): # Mock create_subprocess_exec with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: # Execute with explicit json_output=True - result = await executor._run_kubectl_command("get pods", bundle, 30, True) + result = await executor._run_kubectl_command( + command="get pods", bundle=bundle, timeout=30, json_output=True + ) # Verify JSON format is returned assert result.is_json is True @@ -514,7 +528,9 @@ async def test_kubectl_user_format_preserved(): # Mock create_subprocess_exec with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: # Execute with user-specified YAML format - result = await executor._run_kubectl_command("get pods -o yaml", bundle, 30, False) + result = await executor._run_kubectl_command( + command="get pods -o yaml", bundle=bundle, timeout=30, json_output=False + ) # Verify user format is preserved assert result.command == "get pods -o yaml" # No modification @@ -557,7 +573,7 @@ async def test_kubectl_executor_defaults_to_table_format(): executor._run_kubectl_command = AsyncMock(return_value=mock_result) # Execute a command WITHOUT specifying json_output (should default to False) - result = await executor.execute("get pods") + result = await executor.execute(command="get pods") # Verify the result is NOT JSON assert result.is_json is False, "Default kubectl execution should NOT return JSON format" @@ -626,7 +642,7 @@ async def test_kubectl_executor_host_only_bundle(): # Test that executing any kubectl command raises appropriate error with pytest.raises(KubectlError) as exc_info: - await executor.execute("get pods") + await executor.execute(command="get pods") # Verify the error message is appropriate for host-only bundles error = exc_info.value @@ -668,7 +684,7 @@ async def test_kubectl_executor_regular_bundle_not_affected(): with patch("pathlib.Path.exists", return_value=True): with patch("asyncio.create_subprocess_exec", return_value=mock_process): # This should work normally (no host-only bundle error) - result = await executor.execute("get pods", json_output=False) + result = await executor.execute(command="get pods", json_output=False) # Verify it returns a normal result assert result.exit_code == 0 @@ -687,7 +703,7 @@ async def test_kubectl_executor_no_bundle_still_works(): # Test that it raises the normal "no bundle" error, not host-only error with pytest.raises(KubectlError) as exc_info: - await executor.execute("get pods") + await executor.execute(command="get pods") # Verify this is the standard "no bundle" error error = exc_info.value diff --git a/tests/unit/test_kubectl_parametrized.py b/tests/unit/test_kubectl_parametrized.py index feb4bf5..a190582 100644 --- a/tests/unit/test_kubectl_parametrized.py +++ b/tests/unit/test_kubectl_parametrized.py @@ -153,7 +153,9 @@ async def test_kubectl_command_execution_parameters(command, expected_args, add_ # Mock the create_subprocess_exec function with patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec: # Execute the command - result = await executor._run_kubectl_command(command, bundle, 30, True) + result = await executor._run_kubectl_command( + command=command, bundle=bundle, timeout=30, json_output=True + ) # Verify the command was constructed correctly mock_exec.assert_awaited_once() @@ -240,14 +242,18 @@ async def test_kubectl_error_handling( if should_raise: # Should raise KubectlError with pytest.raises(KubectlError) as excinfo: - await executor._run_kubectl_command("get pods", bundle, 30, True) + await executor._run_kubectl_command( + command="get pods", bundle=bundle, timeout=30, json_output=True + ) # Verify error details assert excinfo.value.exit_code == expected_exit_code assert stderr_content in excinfo.value.stderr else: # Should succeed - result = await executor._run_kubectl_command("get pods", bundle, 30, True) + result = await executor._run_kubectl_command( + command="get pods", bundle=bundle, timeout=30, json_output=True + ) # Verify result details assert result.exit_code == expected_exit_code @@ -298,7 +304,9 @@ async def mock_subprocess_timeout(*args, **kwargs): side_effect=mock_subprocess_timeout, ): with pytest.raises(KubectlError) as excinfo: - await executor._run_kubectl_command("get pods", bundle, 0.1, True) + await executor._run_kubectl_command( + command="get pods", bundle=bundle, timeout=0.1, json_output=True + ) # Verify error details assert "timed out" in str(excinfo.value).lower() diff --git a/tests/unit/test_schema_validation.py b/tests/unit/test_schema_validation.py new file mode 100644 index 0000000..ae848a8 --- /dev/null +++ b/tests/unit/test_schema_validation.py @@ -0,0 +1,258 @@ +""" +Test for MCP schema validation to prevent regression to args wrapper format. + +This file contains focused tests to ensure that MCP tool schemas never revert +to the non-standard 'args' wrapper format. +""" + +import json + +import pytest + +from mcp_server_troubleshoot.server import mcp + + +# Mark all tests in this file as unit tests and quick tests +pytestmark = [pytest.mark.unit, pytest.mark.quick] + + +@pytest.mark.asyncio +async def test_mcp_tool_schemas_do_not_use_args_wrapper(): + """ + Test that all MCP tool schemas generate standard format without 'args' wrapper. + + This test ensures that FastMCP generates tool schemas that follow the standard + MCP format with direct parameters, not wrapped in an 'args' object. + + The schemas should look like: + { + "properties": { + "source": {"type": "string", "description": "..."}, + "force": {"type": "boolean", "default": false, "description": "..."}, + "verbosity": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null} + }, + "required": ["source"], + "title": "initialize_bundleArguments", + "type": "object" + } + + NOT like: + { + "properties": { + "args": { + "$ref": "#/$defs/InitializeBundleArgs" + } + }, + "required": ["args"], + "title": "initialize_bundleArguments", + "type": "object" + } + """ + # Get all tools from the FastMCP server + tools = await mcp.list_tools() + + # Assert we have the expected tools + expected_tool_names = { + "initialize_bundle", + "list_available_bundles", + "kubectl", + "list_files", + "read_file", + "grep_files", + } + + actual_tool_names = {tool.name for tool in tools} + assert actual_tool_names == expected_tool_names, ( + f"Expected tools {expected_tool_names}, got {actual_tool_names}" + ) + + # Check each tool's schema + for tool in tools: + tool_name = tool.name + input_schema = tool.inputSchema + + # Convert schema to dict if it's not already + if hasattr(input_schema, "model_dump"): + schema_dict = input_schema.model_dump() + else: + schema_dict = input_schema + + # Assert the schema is a dict + assert isinstance(schema_dict, dict), f"Schema for {tool_name} should be a dict" + + # Assert the schema is an object type + assert schema_dict.get("type") == "object", f"Schema for {tool_name} should be object type" + + # Assert the schema has properties + assert "properties" in schema_dict, f"Schema for {tool_name} should have properties" + properties = schema_dict["properties"] + + # Critical assertion: properties should NOT contain an 'args' key + assert "args" not in properties, ( + f"Schema for {tool_name} contains 'args' wrapper - this violates standard MCP format" + ) + + # Verify that properties contain expected direct parameters + if tool_name == "initialize_bundle": + assert "source" in properties, "initialize_bundle schema should have 'source' parameter" + assert "force" in properties, "initialize_bundle schema should have 'force' parameter" + assert "verbosity" in properties, ( + "initialize_bundle schema should have 'verbosity' parameter" + ) + + elif tool_name == "kubectl": + assert "command" in properties, "kubectl schema should have 'command' parameter" + assert "timeout" in properties, "kubectl schema should have 'timeout' parameter" + assert "json_output" in properties, "kubectl schema should have 'json_output' parameter" + assert "verbosity" in properties, "kubectl schema should have 'verbosity' parameter" + + elif tool_name == "list_files": + assert "path" in properties, "list_files schema should have 'path' parameter" + assert "recursive" in properties, "list_files schema should have 'recursive' parameter" + assert "verbosity" in properties, "list_files schema should have 'verbosity' parameter" + + elif tool_name == "read_file": + assert "path" in properties, "read_file schema should have 'path' parameter" + assert "start_line" in properties, "read_file schema should have 'start_line' parameter" + assert "end_line" in properties, "read_file schema should have 'end_line' parameter" + assert "verbosity" in properties, "read_file schema should have 'verbosity' parameter" + + elif tool_name == "grep_files": + assert "pattern" in properties, "grep_files schema should have 'pattern' parameter" + assert "path" in properties, "grep_files schema should have 'path' parameter" + assert "recursive" in properties, "grep_files schema should have 'recursive' parameter" + assert "glob_pattern" in properties, ( + "grep_files schema should have 'glob_pattern' parameter" + ) + assert "case_sensitive" in properties, ( + "grep_files schema should have 'case_sensitive' parameter" + ) + assert "max_results" in properties, ( + "grep_files schema should have 'max_results' parameter" + ) + assert "max_results_per_file" in properties, ( + "grep_files schema should have 'max_results_per_file' parameter" + ) + assert "max_files" in properties, "grep_files schema should have 'max_files' parameter" + assert "verbosity" in properties, "grep_files schema should have 'verbosity' parameter" + + elif tool_name == "list_available_bundles": + assert "include_invalid" in properties, ( + "list_available_bundles schema should have 'include_invalid' parameter" + ) + assert "verbosity" in properties, ( + "list_available_bundles schema should have 'verbosity' parameter" + ) + + # Check that no $refs point to Args classes (another sign of wrapper usage) + schema_json = json.dumps(schema_dict) + assert "Args" not in schema_json, ( + f"Schema for {tool_name} contains references to Args classes" + ) + + +@pytest.mark.asyncio +async def test_tool_schemas_are_valid_json_schema(): + """ + Test that all tool schemas are valid JSON schemas. + + This ensures the schemas are well-formed and can be used by MCP clients. + """ + tools = await mcp.list_tools() + + for tool in tools: + input_schema = tool.inputSchema + + # Convert schema to dict if it's not already + if hasattr(input_schema, "model_dump"): + schema_dict = input_schema.model_dump() + else: + schema_dict = input_schema + + # Basic JSON schema validation + assert isinstance(schema_dict, dict) + assert "type" in schema_dict + assert schema_dict["type"] == "object" + assert "properties" in schema_dict + assert isinstance(schema_dict["properties"], dict) + + # If required is present, it should be a list + if "required" in schema_dict: + assert isinstance(schema_dict["required"], list) + + # Title should be present + assert "title" in schema_dict + assert isinstance(schema_dict["title"], str) + + +@pytest.mark.asyncio +async def test_verbosity_parameter_consistency(): + """ + Test that all tools have a consistent verbosity parameter definition. + + The verbosity parameter should be optional and allow string or null values. + """ + tools = await mcp.list_tools() + + for tool in tools: + input_schema = tool.inputSchema + + # Convert schema to dict if it's not already + if hasattr(input_schema, "model_dump"): + schema_dict = input_schema.model_dump() + else: + schema_dict = input_schema + + properties = schema_dict["properties"] + + # All tools should have verbosity parameter + assert "verbosity" in properties, f"Tool {tool.name} is missing verbosity parameter" + + verbosity_schema = properties["verbosity"] + + # Verbosity should allow string or null + assert "anyOf" in verbosity_schema or "type" in verbosity_schema, ( + f"Tool {tool.name} verbosity parameter has invalid schema" + ) + + # Verbosity should be optional (not in required list) + required = schema_dict.get("required", []) + assert "verbosity" not in required, ( + f"Tool {tool.name} verbosity parameter should be optional" + ) + + +@pytest.mark.asyncio +async def test_required_parameters_match_expectations(): + """ + Test that each tool has the expected required parameters. + """ + tools = {tool.name: tool for tool in await mcp.list_tools()} + + # Define expected required parameters for each tool + expected_required = { + "initialize_bundle": ["source"], + "list_available_bundles": [], # No required parameters + "kubectl": ["command"], + "list_files": ["path"], + "read_file": ["path"], + "grep_files": ["pattern", "path"], + } + + for tool_name, expected in expected_required.items(): + assert tool_name in tools, f"Tool {tool_name} not found" + + tool = tools[tool_name] + input_schema = tool.inputSchema + + # Convert schema to dict if it's not already + if hasattr(input_schema, "model_dump"): + schema_dict = input_schema.model_dump() + else: + schema_dict = input_schema + + actual_required = schema_dict.get("required", []) + + assert set(actual_required) == set(expected), ( + f"Tool {tool_name} required parameters mismatch. Expected {expected}, got {actual_required}" + ) diff --git a/tests/unit/test_server.py b/tests/unit/test_server.py index e0a399c..c854705 100644 --- a/tests/unit/test_server.py +++ b/tests/unit/test_server.py @@ -97,13 +97,10 @@ async def test_initialize_bundle_tool(tmp_path: Path) -> None: mock_diag.return_value = {"api_server_available": True} mock_get_manager.return_value = bundle_manager - # Create InitializeBundleArgs instance - from mcp_server_troubleshoot.bundle import InitializeBundleArgs - - args = InitializeBundleArgs(source=str(temp_source_file), force=False, verbosity="verbose") - - # Call the tool function directly - response = await initialize_bundle(args) + # Call the tool function directly with parameters + response = await initialize_bundle( + source=str(temp_source_file), force=False, verbosity="verbose" + ) # Verify the bundle manager methods were called mock_sbctl.assert_awaited_once() @@ -174,16 +171,11 @@ async def test_kubectl_tool(tmp_path: Path) -> None: mock_get_manager.return_value = bundle_manager mock_get_executor.return_value = kubectl_executor - # Create KubectlCommandArgs instance - from mcp_server_troubleshoot.kubectl import KubectlCommandArgs - - args = KubectlCommandArgs( + # Call the tool function directly with parameters + response = await kubectl( command="get pods", timeout=30, json_output=True, verbosity="verbose" ) - # Call the tool function directly - response = await kubectl(args) - # Verify the API server check was called mock_api.assert_awaited_once() # Verify the kubectl executor was called @@ -235,16 +227,11 @@ async def test_kubectl_tool_host_only_bundle(tmp_path: Path) -> None: ): mock_get_manager.return_value = bundle_manager - # Create KubectlCommandArgs instance - from mcp_server_troubleshoot.kubectl import KubectlCommandArgs - - args = KubectlCommandArgs( + # Call the tool function directly with parameters + response = await kubectl( command="get pods", timeout=30, json_output=True, verbosity="verbose" ) - # Call the tool function directly - response = await kubectl(args) - # Verify the error response assert isinstance(response, list) assert len(response) == 1 @@ -303,10 +290,9 @@ async def test_file_operations(tmp_path: Path) -> None: mock_get_explorer.return_value = file_explorer # 1. Test list_files with real files - from mcp_server_troubleshoot.files import ListFilesArgs - - list_args = ListFilesArgs(path="test_data/dir1", recursive=False, verbosity="verbose") - list_response = await list_files(list_args) + list_response = await list_files( + path="test_data/dir1", recursive=False, verbosity="verbose" + ) # Verify the response contains real file information assert len(list_response) == 1 @@ -315,15 +301,12 @@ async def test_file_operations(tmp_path: Path) -> None: assert "file1.txt" in list_response[0].text # 2. Test read_file with real file - from mcp_server_troubleshoot.files import ReadFileArgs - - read_args = ReadFileArgs( + read_response = await read_file( path="test_data/dir1/file1.txt", start_line=0, end_line=2, verbosity="verbose", ) - read_response = await read_file(read_args) # Verify the response contains real file content assert len(read_response) == 1 @@ -332,9 +315,7 @@ async def test_file_operations(tmp_path: Path) -> None: assert "This is the file content" in read_response[0].text # 3. Test grep_files with real file content - from mcp_server_troubleshoot.files import GrepFilesArgs - - grep_args = GrepFilesArgs( + grep_response = await grep_files( pattern="pattern", path="test_data", recursive=True, @@ -345,7 +326,6 @@ async def test_file_operations(tmp_path: Path) -> None: max_files=10, verbosity="verbose", ) - grep_response = await grep_files(grep_args) # Verify the response contains real grep results assert len(grep_response) == 1 diff --git a/tests/unit/test_server_parametrized.py b/tests/unit/test_server_parametrized.py index 692e304..114d465 100644 --- a/tests/unit/test_server_parametrized.py +++ b/tests/unit/test_server_parametrized.py @@ -159,13 +159,10 @@ async def test_initialize_bundle_tool_parametrized( mock_diag.return_value = {"api_server_available": api_available} mock_get_manager.return_value = bundle_manager - # Create InitializeBundleArgs instance - from mcp_server_troubleshoot.bundle import InitializeBundleArgs - - args = InitializeBundleArgs(source=str(temp_source_file), force=force, verbosity="verbose") - - # Call the tool function - response = await initialize_bundle(args) + # Call the tool function with direct parameters + response = await initialize_bundle( + source=str(temp_source_file), force=force, verbosity="verbose" + ) # Verify method calls on real instance mock_sbctl.assert_awaited_once() @@ -294,19 +291,11 @@ async def test_kubectl_tool_parametrized( # For success cases, return the mock result mock_execute.return_value = mock_result - # Create KubectlCommandArgs instance - from mcp_server_troubleshoot.kubectl import KubectlCommandArgs - - args = KubectlCommandArgs( - command=command, - timeout=timeout, - json_output=json_output, - verbosity="verbose", + # Call the tool function with direct parameters + response = await kubectl( + command=command, timeout=timeout, json_output=json_output, verbosity="verbose" ) - # Call the tool function - response = await kubectl(args) - # Verify API check called on real instance mock_api.assert_awaited_once() @@ -510,42 +499,42 @@ async def test_file_operations_parametrized( # Execute the appropriate file operation with real components if file_operation == "list_files": - from mcp_server_troubleshoot.files import ListFilesArgs - # Adjust path to real directory structure - args_copy = args.copy() - if args_copy["path"] == "dir1": - args_copy["path"] = "test_data/dir1" - args_copy["verbosity"] = "minimal" - list_operation_args = ListFilesArgs(**args_copy) - response = await list_files(list_operation_args) + path = args["path"] + if path == "dir1": + path = "test_data/dir1" + response = await list_files(path=path, recursive=args["recursive"], verbosity="minimal") elif file_operation == "read_file": - from mcp_server_troubleshoot.files import ReadFileArgs - # Adjust path to real file structure - args_copy = args.copy() - if args_copy["path"] == "dir1/file1.txt": - args_copy["path"] = "test_data/dir1/file1.txt" - args_copy["verbosity"] = "minimal" - read_operation_args = ReadFileArgs(**args_copy) - response = await read_file(read_operation_args) + path = args["path"] + if path == "dir1/file1.txt": + path = "test_data/dir1/file1.txt" + response = await read_file( + path=path, + start_line=args["start_line"], + end_line=args["end_line"], + verbosity="minimal", + ) elif file_operation == "grep_files": - from mcp_server_troubleshoot.files import GrepFilesArgs - # Adjust path to real directory structure - args_copy = args.copy() - if args_copy["path"] == "dir1": - args_copy["path"] = "test_data/dir1" - elif args_copy["path"] == ".": - args_copy["path"] = "test_data" - # Add missing required args - args_copy.setdefault("max_results_per_file", 50) - args_copy.setdefault("max_files", 10) - args_copy["verbosity"] = "minimal" - grep_operation_args = GrepFilesArgs(**args_copy) - response = await grep_files(grep_operation_args) + path = args["path"] + if path == "dir1": + path = "test_data/dir1" + elif path == ".": + path = "test_data" + response = await grep_files( + pattern=args["pattern"], + path=path, + recursive=args["recursive"], + glob_pattern=args["glob_pattern"], + case_sensitive=args["case_sensitive"], + max_results=args["max_results"], + max_results_per_file=args.get("max_results_per_file", 50), + max_files=args.get("max_files", 10), + verbosity="minimal", + ) # Use the test assertion helper to verify response test_assertions.assert_api_response_valid(response, "text", expected_strings) @@ -636,26 +625,18 @@ async def test_file_operations_error_handling( mock_get_explorer.return_value = file_explorer # Test all three file operations with the same error - from mcp_server_troubleshoot.files import ( - ListFilesArgs, - ReadFileArgs, - GrepFilesArgs, - ) - # 1. Test list_files - list_args = ListFilesArgs(path="test/path", recursive=False, verbosity="verbose") - list_response = await list_files(list_args) + list_response = await list_files(path="test/path", recursive=False, verbosity="verbose") test_assertions.assert_api_response_valid(list_response, "text", expected_strings) # 2. Test read_file - read_args = ReadFileArgs( + read_response = await read_file( path="test/file.txt", start_line=0, end_line=None, verbosity="verbose" ) - read_response = await read_file(read_args) test_assertions.assert_api_response_valid(read_response, "text", expected_strings) # 3. Test grep_files - grep_args = GrepFilesArgs( + grep_response = await grep_files( pattern="test", path="test/path", recursive=True, @@ -666,7 +647,6 @@ async def test_file_operations_error_handling( max_files=10, verbosity="verbose", ) - grep_response = await grep_files(grep_args) test_assertions.assert_api_response_valid(grep_response, "text", expected_strings) # No manual cleanup needed - tmp_path handles it automatically @@ -773,13 +753,10 @@ class MockAvailableBundle: mock_list.return_value = bundles mock_get_manager.return_value = bundle_manager - # Create ListAvailableBundlesArgs instance - from mcp_server_troubleshoot.bundle import ListAvailableBundlesArgs - - args = ListAvailableBundlesArgs(include_invalid=include_invalid, verbosity="verbose") - - # Call the tool function - response = await list_available_bundles(args) + # Call the tool function with direct parameters + response = await list_available_bundles( + include_invalid=include_invalid, verbosity="verbose" + ) # Verify method call on real instance mock_list.assert_awaited_once_with(include_invalid)