Skip to content

Commit 20d444a

Browse files
fix: improve connection pool error handling
1 parent 67409a3 commit 20d444a

File tree

3 files changed

+91
-39
lines changed

3 files changed

+91
-39
lines changed

supabase_mcp/logger.py

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,43 @@
1+
import json
12
import logging
23
import logging.handlers
34
from pathlib import Path
5+
from typing import Any
6+
7+
8+
class StructuredLogFormatter(logging.Formatter):
9+
"""
10+
Custom formatter that outputs logs in a structured JSON format.
11+
Includes error tracing and context information.
12+
"""
13+
14+
def format(self, record: logging.LogRecord) -> str:
15+
log_record: dict[str, Any] = {
16+
"timestamp": self.formatTime(record, self.datefmt),
17+
"level": record.levelname,
18+
"message": record.getMessage(),
19+
"module": record.module,
20+
"function": record.funcName,
21+
"line": record.lineno,
22+
}
23+
24+
# Add exception info if present with full traceback
25+
if record.exc_info:
26+
log_record["exception"] = self.formatException(record.exc_info)
27+
# Add exception type for easier filtering
28+
exc_type = record.exc_info[0].__name__ if record.exc_info[0] else "Unknown"
29+
log_record["exception_type"] = exc_type
30+
31+
# Add extra fields from record
32+
extra_data = getattr(record, "extra", None)
33+
if extra_data and isinstance(extra_data, dict):
34+
log_record.update(extra_data)
35+
36+
return json.dumps(log_record, ensure_ascii=False)
437

538

639
def setup_logger() -> logging.Logger:
7-
"""Configure logging for the MCP server with log rotation."""
40+
"""Configure logging for the MCP server with structured logging and log rotation."""
841
logger = logging.getLogger("supabase-mcp")
942

1043
# Remove existing handlers to avoid duplicate logs
@@ -13,28 +46,26 @@ def setup_logger() -> logging.Logger:
1346

1447
# Define a consistent log directory in the user's home folder
1548
log_dir = Path.home() / ".local" / "share" / "supabase-mcp"
16-
log_dir.mkdir(parents=True, exist_ok=True) # Ensure the directory exists
49+
log_dir.mkdir(parents=True, exist_ok=True)
1750

1851
# Define the log file path
1952
log_file = log_dir / "mcp_server.log"
2053

21-
# Create a rotating file handler
22-
# - Rotate when log reaches 5MB
23-
# - Keep 3 backup files
54+
# Create handlers
2455
file_handler = logging.handlers.RotatingFileHandler(
2556
log_file,
2657
maxBytes=5 * 1024 * 1024, # 5MB
2758
backupCount=3,
2859
encoding="utf-8",
2960
)
3061

31-
# Create formatter
32-
formatter = logging.Formatter("[%(asctime)s] %(levelname)-8s %(message)s", datefmt="%y/%m/%d %H:%M:%S")
62+
# Create structured formatter
63+
formatter = StructuredLogFormatter()
3364

34-
# Add formatter to file handler
65+
# Add formatter to handlers
3566
file_handler.setFormatter(formatter)
3667

37-
# Add handler to logger
68+
# Add handlers to logger
3869
logger.addHandler(file_handler)
3970

4071
# Set level
@@ -43,4 +74,5 @@ def setup_logger() -> logging.Logger:
4374
return logger
4475

4576

77+
# Create the global logger instance
4678
logger = setup_logger()

supabase_mcp/services/database/postgres_client.py

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -203,12 +203,12 @@ async def create_pool(self) -> asyncpg.Pool[asyncpg.Record]:
203203
f"4. Your Supabase project is active and the database is online\n"
204204
)
205205

206-
logger.error(f"Failed to connect to database: {e}")
206+
logger.error(f"FAILED to connect to database: {e}")
207207
logger.error(f"Connection details: {host_part}, Project: {self.project_ref}, Region: {self.db_region}")
208208

209209
raise ConnectionError(error_message) from e
210210

211-
except OSError as e:
211+
except (OSError, asyncpg.InterfaceError, asyncpg.TooManyConnectionsError) as e:
212212
# For network-related errors, provide a different message that clearly indicates
213213
# this is a network/system issue rather than a database configuration problem
214214
host_part = self.db_url.split("@")[1].split("/")[0] if "@" in self.db_url else "unknown"
@@ -228,6 +228,10 @@ async def create_pool(self) -> asyncpg.Pool[asyncpg.Record]:
228228
logger.error(f"Connection details: {host_part}")
229229
raise ConnectionError(error_message) from e
230230

231+
except Exception as e:
232+
logger.error(f"Failed to connect to database: {e}")
233+
raise ConnectionError(f"Failed to connect to database: {e}") from e
234+
231235
async def ensure_pool(self) -> None:
232236
"""Ensure a valid connection pool exists.
233237
@@ -360,7 +364,6 @@ async def execute_query(
360364
QueryResult containing the results of all statements
361365
362366
Raises:
363-
ConnectionError: If a database connection issue occurs
364367
QueryError: If the query execution fails
365368
PermissionError: When user lacks required privileges
366369
"""
@@ -372,24 +375,37 @@ async def execute_query(
372375
)
373376
logger.debug(f"Executing query (readonly={readonly}): {truncated_query}")
374377

375-
# Define the operation to execute all statements within a transaction
376-
async def execute_all_statements(conn):
377-
async def transaction_operation():
378-
results = []
379-
for statement in validated_query.statements:
380-
if statement.query: # Skip statements with no query
381-
result = await self.execute_statement(conn, statement.query)
382-
results.append(result)
383-
else:
384-
logger.warning(f"Statement has no query, statement: {statement}")
385-
return results
386-
387-
# Execute the operation within a transaction
388-
results = await self.with_transaction(conn, transaction_operation, readonly)
389-
return QueryResult(results=results)
390-
391-
# Execute the operation with a connection
392-
return await self.with_connection(execute_all_statements)
378+
try:
379+
# Define the operation to execute all statements within a transaction
380+
async def execute_all_statements(conn: asyncpg.Connection[Any]) -> QueryResult:
381+
async def transaction_operation() -> list[StatementResult]:
382+
results = []
383+
for statement in validated_query.statements:
384+
if statement.query: # Skip statements with no query
385+
result = await self.execute_statement(conn, statement.query)
386+
results.append(result)
387+
else:
388+
logger.warning(f"Statement has no query, statement: {statement}")
389+
return results
390+
391+
# Execute the operation within a transaction
392+
results = await self.with_transaction(conn, transaction_operation, readonly)
393+
return QueryResult(results=results)
394+
395+
# Execute the operation with a connection
396+
return await self.with_connection(execute_all_statements)
397+
398+
except ConnectionError as e:
399+
logger.error(f"Query execution failed because of connection error: {e}")
400+
raise QueryError(f"Query execution failed because of connection error: {str(e)}") from e
401+
402+
except PermissionError as e:
403+
logger.error(f"Query execution failed because of permission error: {e}")
404+
raise PermissionError(f"Query execution failed because of permission error: {str(e)}") from e
405+
406+
except QueryError as e:
407+
logger.error(f"Query execution failed: {e}")
408+
raise QueryError(f"Query execution failed: {str(e)}") from e
393409

394410
async def _handle_postgres_error(self, error: asyncpg.PostgresError) -> None:
395411
"""Handle PostgreSQL errors and convert to appropriate exceptions.
@@ -408,10 +424,7 @@ async def _handle_postgres_error(self, error: asyncpg.PostgresError) -> None:
408424
) from error
409425
elif isinstance(
410426
error,
411-
(
412-
asyncpg.exceptions.UndefinedTableError,
413-
asyncpg.exceptions.UndefinedColumnError,
414-
),
427+
(asyncpg.exceptions.UndefinedTableError | asyncpg.exceptions.UndefinedColumnError),
415428
):
416429
logger.error(f"Schema error: {error}")
417430
raise QueryError(str(error)) from error

supabase_mcp/services/database/query_manager.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from supabase_mcp.exceptions import OperationNotAllowedError
1+
from supabase_mcp.exceptions import OperationNotAllowedError, PermissionError, QueryError
22
from supabase_mcp.logger import logger
33
from supabase_mcp.services.database.migration_manager import MigrationManager
44
from supabase_mcp.services.database.postgres_client import PostgresClient, QueryResult
@@ -104,10 +104,17 @@ async def handle_query_execution(self, validated_query: QueryValidationResults)
104104
Returns:
105105
QueryResult: The result of the query execution
106106
"""
107-
readonly = self.check_readonly()
108-
result = await self.db_client.execute_query(validated_query, readonly)
109-
logger.debug(f"Query result: {result}")
110-
return result
107+
try:
108+
readonly = self.check_readonly()
109+
result = await self.db_client.execute_query(validated_query, readonly)
110+
logger.debug(f"Query result: {result}")
111+
return result
112+
except (QueryError, PermissionError) as e:
113+
logger.error(f"Query execution failed: {e}")
114+
raise QueryError(f"Query execution failed: {str(e)}") from e
115+
except Exception as e:
116+
logger.error(f"Unexpected error during query execution: {e}")
117+
raise QueryError(f"Unexpected error during query execution: {str(e)}") from e
111118

112119
async def handle_migration(
113120
self, validation_result: QueryValidationResults, original_query: str, migration_name: str = ""

0 commit comments

Comments
 (0)