1
1
#!/usr/bin/env python3
2
2
"""
3
- Database Manager MCP Server
3
+ Database MCP Server
4
4
5
- A comprehensive MCP server for database operations.
5
+ A comprehensive MCP server for database operations using FastMCP.
6
+ Demonstrates:
7
+ - SQL query execution
8
+ - Database connection management
9
+ - Type-safe operations
10
+ - Resource providers
6
11
"""
7
12
8
13
import asyncio
9
14
import sqlite3
10
- from typing import Dict , Any
15
+ from typing import Dict , List , Any , Optional
16
+ from datetime import datetime
11
17
12
- class DatabaseMCPServer :
13
- """Database Manager MCP Server"""
14
-
15
- def __init__ (self ):
16
- self .connections = {}
18
+ from mcp .server .fastmcp import FastMCP
19
+ import mcp .types as types
20
+ from pydantic import BaseModel
21
+
22
+ # Initialize FastMCP server
23
+ mcp = FastMCP ("database_tools" )
24
+
25
+ class QueryResult (BaseModel ):
26
+ """Model for query results"""
27
+ columns : List [str ]
28
+ rows : List [Dict [str , Any ]]
29
+ row_count : int
30
+ execution_time : float
31
+
32
+ class DatabaseConfig (BaseModel ):
33
+ """Database configuration model"""
34
+ type : str
35
+ connection_string : str
36
+ max_connections : int = 10
37
+
38
+ # System prompts
39
+ @mcp .prompt ()
40
+ def system_prompt () -> str :
41
+ """Define the database assistant's role"""
42
+ return """
43
+ You are a database assistant that helps users interact with databases safely.
44
+ Always validate queries before execution and use proper error handling.
45
+ Never execute dangerous operations without confirmation.
46
+ """
47
+
48
+ @mcp .prompt ()
49
+ def error_prompt () -> str :
50
+ """Handle database errors"""
51
+ return """
52
+ I encountered an error while working with the database.
53
+ This could be due to:
54
+ - Invalid SQL syntax
55
+ - Connection issues
56
+ - Permission problems
57
+ - Resource constraints
58
+ Please check your query and try again.
59
+ """
60
+
61
+ # Database resources
62
+ @mcp .resource ("db://{database}/{table}" )
63
+ async def get_table_info (database : str , table : str ) -> Dict :
64
+ """Get metadata about a database table"""
65
+ try :
66
+ conn = sqlite3 .connect (f"{ database } .db" )
67
+ cursor = conn .cursor ()
68
+ cursor .execute (f"PRAGMA table_info({ table } )" )
69
+ columns = cursor .fetchall ()
17
70
18
- async def connect_database (self , database_type : str , connection_string : str ) -> Dict [str , Any ]:
19
- """Connect to a database"""
20
- try :
21
- if database_type == "sqlite" :
22
- conn = sqlite3 .connect (connection_string )
23
- conn .row_factory = sqlite3 .Row
24
- self .connections ["default" ] = {"type" : "sqlite" , "connection" : conn }
25
-
26
- return {
27
- "success" : True ,
28
- "message" : f"Connected to { database_type } database" ,
29
- "database_type" : database_type
30
- }
31
- except Exception as e :
32
- return {"success" : False , "error" : str (e )}
71
+ return {
72
+ "table" : table ,
73
+ "columns" : [col [1 ] for col in columns ],
74
+ "types" : [col [2 ] for col in columns ],
75
+ "last_checked" : datetime .now ().isoformat ()
76
+ }
77
+ except Exception as e :
78
+ return {"error" : str (e )}
79
+
80
+ # Database tools
81
+ @mcp .tool ()
82
+ async def execute_query (query : str , limit : int = 100 ) -> Dict :
83
+ """
84
+ Execute a SQL query safely
33
85
34
- async def execute_query (self , query : str , limit : int = 100 ) -> Dict [str , Any ]:
35
- """Execute a SQL query"""
36
- try :
37
- if "default" not in self .connections :
38
- return {"success" : False , "error" : "No database connection" }
86
+ Args:
87
+ query: SQL query to execute
88
+ limit: Maximum number of rows to return
89
+
90
+ Returns:
91
+ Dictionary with query results or error information
92
+ """
93
+ try :
94
+ # In production, use connection pooling
95
+ conn = sqlite3 .connect ("demo.db" )
96
+ cursor = conn .cursor ()
97
+
98
+ start_time = datetime .now ()
99
+ cursor .execute (query )
100
+
101
+ if query .lower ().strip ().startswith ("select" ):
102
+ rows = cursor .fetchmany (limit )
103
+ columns = [desc [0 ] for desc in cursor .description ]
104
+ results = [dict (zip (columns , row )) for row in rows ]
39
105
40
- conn = self .connections ["default" ]["connection" ]
41
- cursor = conn .cursor ()
42
- cursor .execute (query )
106
+ execution_time = (datetime .now () - start_time ).total_seconds ()
43
107
44
- if query .lower ().strip ().startswith ("select" ):
45
- rows = cursor .fetchmany (limit )
46
- columns = [desc [0 ] for desc in cursor .description ]
47
- results = [dict (zip (columns , row )) for row in rows ]
48
-
49
- return {
50
- "success" : True ,
51
- "data" : results ,
52
- "row_count" : len (results )
53
- }
54
- else :
55
- conn .commit ()
56
- return {
57
- "success" : True ,
58
- "message" : "Query executed successfully" ,
59
- "rows_affected" : cursor .rowcount
108
+ return {
109
+ "success" : True ,
110
+ "data" : QueryResult (
111
+ columns = columns ,
112
+ rows = results ,
113
+ row_count = len (results ),
114
+ execution_time = execution_time
115
+ ).dict ()
116
+ }
117
+ else :
118
+ conn .commit ()
119
+ execution_time = (datetime .now () - start_time ).total_seconds ()
120
+
121
+ return {
122
+ "success" : True ,
123
+ "data" : {
124
+ "rows_affected" : cursor .rowcount ,
125
+ "execution_time" : execution_time
60
126
}
61
- except Exception as e :
62
- return {"success" : False , "error" : str (e )}
127
+ }
128
+
129
+ except Exception as e :
130
+ return {
131
+ "success" : False ,
132
+ "error" : str (e ),
133
+ "prompt" : "error_prompt"
134
+ }
63
135
64
- async def main ():
65
- """Demo the database MCP server"""
66
- db_server = DatabaseMCPServer ()
67
-
68
- print ("🗄️ Database Manager MCP Server Demo" )
69
- print ("=" * 40 )
70
-
71
- # Connect to demo database
72
- result = await db_server .connect_database ("sqlite" , "demo.db" )
73
- print (f"Connection: { result } " )
74
-
75
- # Create demo table
76
- create_query = """
77
- CREATE TABLE IF NOT EXISTS users (
78
- id INTEGER PRIMARY KEY,
79
- name TEXT,
80
- email TEXT
81
- )
136
+ @mcp .tool ()
137
+ async def get_table_schema (table : str ) -> Dict :
82
138
"""
83
- result = await db_server .execute_query (create_query )
84
- print (f"Create table: { result } " )
139
+ Get schema information for a table
85
140
86
- # Insert data
87
- insert_query = "INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com')"
88
- result = await db_server .execute_query (insert_query )
89
- print (f"Insert: { result } " )
141
+ Args:
142
+ table: Name of the table
143
+
144
+ Returns:
145
+ Dictionary with table schema information
146
+ """
147
+ try :
148
+ # Get schema from resource
149
+ schema = await get_table_info ("demo" , table )
150
+
151
+ if "error" in schema :
152
+ raise Exception (schema ["error" ])
153
+
154
+ return {
155
+ "success" : True ,
156
+ "data" : schema
157
+ }
158
+
159
+ except Exception as e :
160
+ return {
161
+ "success" : False ,
162
+ "error" : str (e ),
163
+ "prompt" : "error_prompt"
164
+ }
165
+
166
+ @mcp .tool ()
167
+ async def validate_query (query : str ) -> Dict :
168
+ """
169
+ Validate a SQL query without executing it
90
170
91
- # Query data
92
- select_query = "SELECT * FROM users"
93
- result = await db_server .execute_query (select_query )
94
- print (f"Query results: { result } " )
171
+ Args:
172
+ query: SQL query to validate
173
+
174
+ Returns:
175
+ Dictionary with validation results
176
+ """
177
+ try :
178
+ conn = sqlite3 .connect (":memory:" )
179
+ cursor = conn .cursor ()
180
+
181
+ # SQLite will parse but not execute in prepare mode
182
+ cursor .execute (f"EXPLAIN QUERY PLAN { query } " )
183
+ plan = cursor .fetchall ()
184
+
185
+ return {
186
+ "success" : True ,
187
+ "data" : {
188
+ "is_valid" : True ,
189
+ "query_plan" : plan ,
190
+ "validation_time" : datetime .now ().isoformat ()
191
+ }
192
+ }
193
+
194
+ except Exception as e :
195
+ return {
196
+ "success" : False ,
197
+ "error" : str (e ),
198
+ "data" : {
199
+ "is_valid" : False ,
200
+ "validation_time" : datetime .now ().isoformat ()
201
+ }
202
+ }
95
203
96
204
if __name__ == "__main__" :
97
- asyncio .run (main ())
205
+ # Run the MCP server
206
+ print ("🗄️ Starting Database MCP Server..." )
207
+ mcp .run (transport = "streamable-http" )
0 commit comments