Skip to content
7 changes: 6 additions & 1 deletion debug_mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ async def debug_mcp_server():

# Set up environment
env = os.environ.copy()
env.update({"SBCTL_TOKEN": "test-token-12345", "MCP_BUNDLE_STORAGE": str(temp_bundle_dir)})
env.update(
{
"SBCTL_TOKEN": "test-token-12345",
"MCP_BUNDLE_STORAGE": str(temp_bundle_dir),
}
)

print("\\n=== Starting MCP Server Manually ===")

Expand Down
15 changes: 12 additions & 3 deletions debug_sbctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ async def debug_sbctl():
# Test 1: Check if sbctl can read the bundle
print("\n=== Test 1: Running sbctl with --help ===")
process = await asyncio.create_subprocess_exec(
"sbctl", "--help", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
"sbctl",
"--help",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
print(f"sbctl --help exit code: {process.returncode}")
Expand All @@ -52,7 +55,10 @@ async def debug_sbctl():
# Set up environment
env = os.environ.copy()
env.update(
{"SBCTL_TOKEN": "test-token-12345", "KUBECONFIG": str(temp_dir_path / "kubeconfig")}
{
"SBCTL_TOKEN": "test-token-12345",
"KUBECONFIG": str(temp_dir_path / "kubeconfig"),
}
)

cmd = [
Expand All @@ -66,7 +72,10 @@ async def debug_sbctl():

# Start the process with timeout
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)

print(f"Process started with PID: {process.pid}")
Expand Down
Binary file modified packages/x86_64/APKINDEX.tar.gz
Binary file not shown.
Binary file modified packages/x86_64/troubleshoot-mcp-server-0.1.0-r0.apk
Binary file not shown.
4 changes: 3 additions & 1 deletion src/mcp_server_troubleshoot/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ def parse_args(args: Optional[List[str]] = None) -> argparse.Namespace:
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
parser.add_argument("--bundle-dir", type=str, help="Directory to store support bundles")
parser.add_argument(
"--show-config", action="store_true", help="Show recommended MCP client configuration"
"--show-config",
action="store_true",
help="Show recommended MCP client configuration",
)
return parser.parse_args(args)

Expand Down
42 changes: 31 additions & 11 deletions src/mcp_server_troubleshoot/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
)

# Feature flags from environment variables
CLEANUP_ORPHANED = os.environ.get("SBCTL_CLEANUP_ORPHANED", "true").lower() in ("true", "1", "yes")
CLEANUP_ORPHANED = os.environ.get("SBCTL_CLEANUP_ORPHANED", "true").lower() in (
"true",
"1",
"yes",
)
ALLOW_ALTERNATIVE_KUBECONFIG = os.environ.get(
"SBCTL_ALLOW_ALTERNATIVE_KUBECONFIG", "true"
).lower() in ("true", "1", "yes")
Expand Down Expand Up @@ -97,7 +101,8 @@ class BundleMetadata(BaseModel):
kubeconfig_path: Path = Field(description="The path to the kubeconfig file")
initialized: bool = Field(description="Whether the bundle has been initialized with sbctl")
host_only_bundle: bool = Field(
False, description="Whether this bundle contains only host resources (no cluster resources)"
False,
description="Whether this bundle contains only host resources (no cluster resources)",
)


Expand All @@ -108,10 +113,12 @@ class InitializeBundleArgs(BaseModel):

source: str = Field(description="The source of the bundle (URL or local path)")
force: bool = Field(
False, description="Whether to force re-initialization if a bundle is already active"
False,
description="Whether to force re-initialization if a bundle is already active",
)
verbosity: Optional[str] = Field(
None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)"
None,
description="Verbosity level for response formatting (minimal|standard|verbose|debug)",
)

@field_validator("source")
Expand Down Expand Up @@ -180,10 +187,12 @@ class ListAvailableBundlesArgs(BaseModel):
"""

include_invalid: bool = Field(
False, description="Whether to include invalid or inaccessible bundles in the results"
False,
description="Whether to include invalid or inaccessible bundles in the results",
)
verbosity: Optional[str] = Field(
None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)"
None,
description="Verbosity level for response formatting (minimal|standard|verbose|debug)",
)


Expand Down Expand Up @@ -797,7 +806,8 @@ async def _initialize_with_sbctl(self, bundle_path: Path, output_dir: Path) -> P
)
try:
safe_copy_file(
announced_kubeconfig, kubeconfig_path
announced_kubeconfig,
kubeconfig_path,
)
logger.info(
f"Successfully copied kubeconfig to {kubeconfig_path}"
Expand Down Expand Up @@ -1190,7 +1200,10 @@ async def _wait_for_initialization(

# Search for any newly created kubeconfig files in common locations if enabled
if ALLOW_ALTERNATIVE_KUBECONFIG:
for pattern in ["/tmp/kubeconfig*", "/var/folders/*/*/local-kubeconfig-*"]:
for pattern in [
"/tmp/kubeconfig*",
"/var/folders/*/*/local-kubeconfig-*",
]:
for path in glob.glob(pattern):
kubeconfig_file = Path(path)
if kubeconfig_file not in alternative_kubeconfig_paths:
Expand Down Expand Up @@ -1348,7 +1361,10 @@ async def _terminate_sbctl_process(self) -> None:
logger.debug(
f"Process {pid} terminated successfully"
)
except (ProcessLookupError, PermissionError) as e:
except (
ProcessLookupError,
PermissionError,
) as e:
logger.debug(f"Error terminating process {pid}: {e}")
except (
psutil.NoSuchProcess,
Expand Down Expand Up @@ -2032,7 +2048,7 @@ async def list_available_bundles(self, include_invalid: bool = False) -> List[Bu
path=str(file_path),
relative_path=file_path.name,
name=file_path.name,
size_bytes=file_path.stat().st_size if file_path.exists() else 0,
size_bytes=(file_path.stat().st_size if file_path.exists() else 0),
modified_time=(
file_path.stat().st_mtime if file_path.exists() else 0
),
Expand Down Expand Up @@ -2144,7 +2160,11 @@ async def cleanup(self) -> None:
and any("sbctl" in arg for arg in proc.info["cmdline"])
):
sbctl_processes.append(proc)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
except (
psutil.NoSuchProcess,
psutil.AccessDenied,
psutil.ZombieProcess,
):
# Process disappeared or access denied - skip it
continue

Expand Down
19 changes: 14 additions & 5 deletions src/mcp_server_troubleshoot/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class ListFilesArgs(BaseModel):
path: str = Field(description="The path within the bundle to list")
recursive: bool = Field(False, description="Whether to list recursively")
verbosity: Optional[str] = Field(
None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)"
None,
description="Verbosity level for response formatting (minimal|standard|verbose|debug)",
)

@field_validator("path")
Expand Down Expand Up @@ -105,7 +106,8 @@ class ReadFileArgs(BaseModel):
None, description="The line number to end reading at (0-indexed, inclusive)"
)
verbosity: Optional[str] = Field(
None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)"
None,
description="Verbosity level for response formatting (minimal|standard|verbose|debug)",
)

@field_validator("path")
Expand Down Expand Up @@ -159,7 +161,8 @@ class GrepFilesArgs(BaseModel):
max_results_per_file: int = Field(5, description="Maximum number of results to return per file")
max_files: int = Field(10, description="Maximum number of files to search/return")
verbosity: Optional[str] = Field(
None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)"
None,
description="Verbosity level for response formatting (minimal|standard|verbose|debug)",
)

@field_validator("path")
Expand Down Expand Up @@ -280,7 +283,8 @@ class GrepResult(BaseModel):
case_sensitive: bool = Field(description="Whether the search was case-sensitive")
truncated: bool = Field(description="Whether the results were truncated due to max_results")
files_truncated: bool = Field(
default=False, description="Whether the file list was truncated due to max_files"
default=False,
description="Whether the file list was truncated due to max_files",
)


Expand Down Expand Up @@ -592,7 +596,12 @@ async def read_file(
binary=is_binary,
)

except (PathNotFoundError, InvalidPathError, ReadFileError, FileSystemError) as e:
except (
PathNotFoundError,
InvalidPathError,
ReadFileError,
FileSystemError,
) as e:
# Re-raise known errors
logger.error(f"Error reading file: {str(e)}")
raise
Expand Down
113 changes: 113 additions & 0 deletions src/mcp_server_troubleshoot/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,119 @@ def format_error(self, error_message: str, diagnostics: Optional[Dict[str, Any]]
response += f"\n\nDiagnostic information:\n```json\n{json.dumps(diagnostics, separators=(',', ':'))}\n```"
return response

def format_overflow_message(self, tool_name: str, estimated_tokens: int, content: str) -> str:
"""
Format helpful overflow message with tool-specific guidance.

Args:
tool_name: Name of the MCP tool that generated the content
estimated_tokens: Estimated token count of the content
content: The original content that was too large

Returns:
Formatted overflow message with tool-specific suggestions and content preview
"""
# Default token limit (can be made configurable)
token_limit = 25000

# Base overflow message
message = f"Content too large ({estimated_tokens:,} tokens, limit: {token_limit:,}).\n\n"

# Tool-specific suggestions
suggestions = self._get_tool_specific_suggestions(tool_name)
if suggestions:
message += "Suggestions to reduce output size:\n"
for i, suggestion in enumerate(suggestions, 1):
message += f"{i}. {suggestion}\n"
message += "\n"

# Add content preview based on verbosity level
preview = self._generate_content_preview(content)
if preview:
message += preview

return message

def _get_tool_specific_suggestions(self, tool_name: str) -> List[str]:
"""Get tool-specific suggestions for reducing output size."""
suggestions_map = {
"list_files": [
"Use non-recursive listing: list_files(recursive=false)",
'Target specific directories: list_files(path="specific/subdirectory")',
'Use minimal verbosity: list_files(verbosity="minimal")',
],
"read_file": [
"Read specific line ranges: read_file(start_line=1, end_line=100)",
"Read smaller chunks: read_file(start_line=1, end_line=50)",
"Target specific sections of the file",
],
"grep_files": [
"Limit results per file: grep_files(max_results=50, max_files=5)",
'Use specific file patterns: grep_files(glob_pattern="*.yaml")',
'Target specific directories: grep_files(path="specific/subdirectory")',
'Use minimal verbosity: grep_files(verbosity="minimal")',
],
"kubectl": [
"Query specific resources: kubectl get pods --selector=app=myapp",
"Use different output formats: kubectl get pods -o name",
"Target specific namespaces: kubectl get pods -n specific-namespace",
"Limit results: kubectl get pods --limit=50",
],
"initialize_bundle": ['Use minimal verbosity: initialize_bundle(verbosity="minimal")'],
"list_bundles": ['Use minimal verbosity: list_bundles(verbosity="minimal")'],
}

return suggestions_map.get(
tool_name,
[
'Use minimal verbosity if supported: tool_name(verbosity="minimal")',
"Apply filtering parameters to reduce data scope",
"Target more specific data subsets",
],
)

def _generate_content_preview(self, content: str) -> str:
"""Generate a content preview based on verbosity level."""
if self.verbosity == VerbosityLevel.MINIMAL:
# Very brief preview for minimal mode
preview_chars = 100
preview_text = content[:preview_chars]
if len(content) > preview_chars:
preview_text += "..."
return f"First {preview_chars} characters:\n{preview_text}\n"

elif self.verbosity == VerbosityLevel.STANDARD:
# Moderate preview for standard mode
preview_chars = 200
preview_text = content[:preview_chars]
if len(content) > preview_chars:
preview_text += "..."
return f"Showing first {preview_chars} characters:\n{preview_text}\n"

else: # VERBOSE or DEBUG
# More detailed preview with structure info
preview_chars = 500
preview_text = content[:preview_chars]
if len(content) > preview_chars:
preview_text += "..."

# Try to provide structural information
lines = content.split("\n")
total_lines = len(lines)

preview_msg = (
f"Showing first {preview_chars} characters (content has {total_lines:,} lines):\n"
)
preview_msg += "```\n" + preview_text + "\n```\n"

# Add helpful context about the content structure
if content.strip().startswith("{") or content.strip().startswith("["):
preview_msg += "\n*Note: Content appears to be JSON format*\n"
elif "```" in content:
preview_msg += "\n*Note: Content contains code blocks or formatted sections*\n"

return preview_msg

def _format_file_size(self, size_bytes: int) -> str:
"""Format file size in human-readable format."""
if size_bytes < 1024:
Expand Down
3 changes: 2 additions & 1 deletion src/mcp_server_troubleshoot/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class KubectlCommandArgs(BaseModel):
timeout: int = Field(30, description="Timeout in seconds for the command")
json_output: bool = Field(False, description="Whether to format the output as JSON")
verbosity: Optional[str] = Field(
None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)"
None,
description="Verbosity level for response formatting (minimal|standard|verbose|debug)",
)

@field_validator("command")
Expand Down
Loading