🎯 Advanced MCP Server Development
📋 Building on Part 1 Foundation
In Part 1, we built a basic MCP server with utility tools. Now we’ll create production-ready servers with:
- Database Integration: PostgreSQL, MySQL, SQLite connections with query tools
- API Connectors: REST API integration and testing capabilities
- Real-time Processing: WebSocket connections and live data streams
- Production Deployment: Docker containers, monitoring, and scaling
- Security: Authentication, authorization, and audit logging
🏗️ Advanced MCP Architecture
graph TD
A[Claude Code] --> B[MCP Client]
B --> C[Advanced MCP Server]
C --> D[Database Manager]
C --> E[API Connector Hub]
C --> F[Real-time Processor]
C --> G[Security Layer]
D --> H[PostgreSQL Pool]
D --> I[MySQL Pool]
D --> J[SQLite Handler]
D --> K[Query Executor]
E --> L[REST Client]
E --> M[GraphQL Client]
E --> N[WebSocket Client]
E --> O[Auth Manager]
F --> P[Event Streams]
F --> Q[Data Pipelines]
F --> R[Cache Layer]
G --> S[JWT Validation]
G --> T[Rate Limiting]
G --> U[Audit Logging]
G --> V[Permission Engine]
style C fill:#34a853
style D fill:#4285f4
style E fill:#ea4335
style F fill:#fbbc04
style G fill:#ff6d01
🗄️ Database Integration & Query Tools
📊 Multi-Database Connection Manager
Let’s create a robust database integration system that supports multiple database types:
database_manager.py:
1#!/usr/bin/env python3
2"""
3Advanced Database Manager for MCP Server
4Supports PostgreSQL, MySQL, SQLite with connection pooling
5"""
6
7import asyncio
8import asyncpg
9import aiomysql
10import aiosqlite
11import logging
12from typing import Dict, List, Any, Optional, Union
13from dataclasses import dataclass
14from contextlib import asynccontextmanager
15import json
16from datetime import datetime
17import hashlib
18
19logger = logging.getLogger(__name__)
20
21@dataclass
22class DatabaseConfig:
23 """Database connection configuration"""
24 db_type: str # 'postgresql', 'mysql', 'sqlite'
25 host: Optional[str] = None
26 port: Optional[int] = None
27 database: str = None
28 username: Optional[str] = None
29 password: Optional[str] = None
30 file_path: Optional[str] = None # For SQLite
31 pool_size: int = 10
32 max_overflow: int = 20
33 timeout: int = 30
34
35class DatabaseManager:
36 """Advanced database connection manager with pooling"""
37
38 def __init__(self, configs: Dict[str, DatabaseConfig]):
39 self.configs = configs
40 self.pools = {}
41 self.query_cache = {}
42 self.query_stats = {}
43
44 async def initialize(self):
45 """Initialize database connections and pools"""
46 for name, config in self.configs.items():
47 try:
48 if config.db_type == 'postgresql':
49 self.pools[name] = await asyncpg.create_pool(
50 host=config.host,
51 port=config.port or 5432,
52 database=config.database,
53 user=config.username,
54 password=config.password,
55 min_size=1,
56 max_size=config.pool_size,
57 command_timeout=config.timeout
58 )
59
60 elif config.db_type == 'mysql':
61 self.pools[name] = await aiomysql.create_pool(
62 host=config.host,
63 port=config.port or 3306,
64 db=config.database,
65 user=config.username,
66 password=config.password,
67 minsize=1,
68 maxsize=config.pool_size,
69 autocommit=False
70 )
71
72 elif config.db_type == 'sqlite':
73 # SQLite doesn't use connection pools in the same way
74 self.pools[name] = config.file_path
75
76 logger.info(f"Initialized {config.db_type} connection pool: {name}")
77
78 except Exception as e:
79 logger.error(f"Failed to initialize database {name}: {e}")
80 raise
81
82 async def close(self):
83 """Close all database connections"""
84 for name, pool in self.pools.items():
85 try:
86 if hasattr(pool, 'close'):
87 pool.close()
88 if hasattr(pool, 'wait_closed'):
89 await pool.wait_closed()
90 logger.info(f"Closed database connection: {name}")
91 except Exception as e:
92 logger.error(f"Error closing database {name}: {e}")
93
94 @asynccontextmanager
95 async def get_connection(self, db_name: str):
96 """Get database connection from pool"""
97 if db_name not in self.pools:
98 raise ValueError(f"Database {db_name} not configured")
99
100 config = self.configs[db_name]
101
102 if config.db_type == 'postgresql':
103 async with self.pools[db_name].acquire() as conn:
104 yield conn
105
106 elif config.db_type == 'mysql':
107 async with self.pools[db_name].acquire() as conn:
108 yield conn
109
110 elif config.db_type == 'sqlite':
111 async with aiosqlite.connect(self.pools[db_name]) as conn:
112 yield conn
113
114 async def execute_query(
115 self,
116 db_name: str,
117 query: str,
118 params: Optional[List] = None,
119 fetch_type: str = 'all' # 'all', 'one', 'none'
120 ) -> Dict[str, Any]:
121 """Execute database query with safety checks and logging"""
122
123 # Security check
124 if not self._is_safe_query(query):
125 raise ValueError("Query contains potentially unsafe operations")
126
127 query_hash = hashlib.md5(f"{db_name}:{query}".encode()).hexdigest()
128 start_time = datetime.now()
129
130 try:
131 async with self.get_connection(db_name) as conn:
132 config = self.configs[db_name]
133
134 if config.db_type == 'postgresql':
135 if fetch_type == 'all':
136 rows = await conn.fetch(query, *(params or []))
137 result = [dict(row) for row in rows]
138 elif fetch_type == 'one':
139 row = await conn.fetchrow(query, *(params or []))
140 result = dict(row) if row else None
141 else: # fetch_type == 'none'
142 await conn.execute(query, *(params or []))
143 result = {"status": "executed"}
144
145 elif config.db_type == 'mysql':
146 async with conn.cursor() as cursor:
147 await cursor.execute(query, params or [])
148
149 if fetch_type == 'all':
150 rows = await cursor.fetchall()
151 # Get column names
152 columns = [desc[0] for desc in cursor.description] if cursor.description else []
153 result = [dict(zip(columns, row)) for row in rows] if rows else []
154 elif fetch_type == 'one':
155 row = await cursor.fetchone()
156 if row and cursor.description:
157 columns = [desc[0] for desc in cursor.description]
158 result = dict(zip(columns, row))
159 else:
160 result = None
161 else: # fetch_type == 'none'
162 result = {"status": "executed", "rowcount": cursor.rowcount}
163
164 if fetch_type == 'none':
165 await conn.commit()
166
167 elif config.db_type == 'sqlite':
168 if fetch_type == 'all':
169 async with conn.execute(query, params or []) as cursor:
170 rows = await cursor.fetchall()
171 columns = [desc[0] for desc in cursor.description] if cursor.description else []
172 result = [dict(zip(columns, row)) for row in rows] if rows else []
173 elif fetch_type == 'one':
174 async with conn.execute(query, params or []) as cursor:
175 row = await cursor.fetchone()
176 if row and cursor.description:
177 columns = [desc[0] for desc in cursor.description]
178 result = dict(zip(columns, row))
179 else:
180 result = None
181 else: # fetch_type == 'none'
182 await conn.execute(query, params or [])
183 await conn.commit()
184 result = {"status": "executed"}
185
186 # Log query statistics
187 execution_time = (datetime.now() - start_time).total_seconds()
188 self._record_query_stats(query_hash, query, execution_time, True)
189
190 return {
191 "success": True,
192 "data": result,
193 "execution_time_ms": execution_time * 1000,
194 "query_hash": query_hash
195 }
196
197 except Exception as e:
198 execution_time = (datetime.now() - start_time).total_seconds()
199 self._record_query_stats(query_hash, query, execution_time, False, str(e))
200
201 return {
202 "success": False,
203 "error": str(e),
204 "execution_time_ms": execution_time * 1000,
205 "query_hash": query_hash
206 }
207
208 def _is_safe_query(self, query: str) -> bool:
209 """Basic query safety validation"""
210 query_lower = query.lower().strip()
211
212 # Block potentially dangerous operations
213 dangerous_operations = [
214 'drop table', 'drop database', 'truncate',
215 'delete from', 'update ', 'insert into',
216 'create user', 'grant ', 'revoke ',
217 'alter table', 'create database'
218 ]
219
220 # Allow only SELECT queries by default
221 # In production, you might want more sophisticated validation
222 if not query_lower.startswith('select'):
223 return False
224
225 for dangerous in dangerous_operations:
226 if dangerous in query_lower:
227 return False
228
229 return True
230
231 def _record_query_stats(self, query_hash: str, query: str, execution_time: float, success: bool, error: str = None):
232 """Record query execution statistics"""
233 if query_hash not in self.query_stats:
234 self.query_stats[query_hash] = {
235 'query': query[:200] + '...' if len(query) > 200 else query,
236 'execution_count': 0,
237 'total_time': 0,
238 'avg_time': 0,
239 'success_count': 0,
240 'error_count': 0,
241 'last_executed': None,
242 'last_error': None
243 }
244
245 stats = self.query_stats[query_hash]
246 stats['execution_count'] += 1
247 stats['total_time'] += execution_time
248 stats['avg_time'] = stats['total_time'] / stats['execution_count']
249 stats['last_executed'] = datetime.now().isoformat()
250
251 if success:
252 stats['success_count'] += 1
253 else:
254 stats['error_count'] += 1
255 stats['last_error'] = error
256
257 async def get_schema_info(self, db_name: str, table_name: Optional[str] = None) -> Dict[str, Any]:
258 """Get database schema information"""
259 config = self.configs[db_name]
260
261 try:
262 if config.db_type == 'postgresql':
263 if table_name:
264 query = """
265 SELECT column_name, data_type, is_nullable, column_default
266 FROM information_schema.columns
267 WHERE table_name = $1 AND table_schema = 'public'
268 ORDER BY ordinal_position
269 """
270 result = await self.execute_query(db_name, query, [table_name])
271 else:
272 query = """
273 SELECT table_name, table_type
274 FROM information_schema.tables
275 WHERE table_schema = 'public'
276 ORDER BY table_name
277 """
278 result = await self.execute_query(db_name, query)
279
280 elif config.db_type == 'mysql':
281 if table_name:
282 query = """
283 SELECT COLUMN_NAME as column_name, DATA_TYPE as data_type,
284 IS_NULLABLE as is_nullable, COLUMN_DEFAULT as column_default
285 FROM INFORMATION_SCHEMA.COLUMNS
286 WHERE TABLE_NAME = %s AND TABLE_SCHEMA = DATABASE()
287 ORDER BY ORDINAL_POSITION
288 """
289 result = await self.execute_query(db_name, query, [table_name])
290 else:
291 query = """
292 SELECT TABLE_NAME as table_name, TABLE_TYPE as table_type
293 FROM INFORMATION_SCHEMA.TABLES
294 WHERE TABLE_SCHEMA = DATABASE()
295 ORDER BY TABLE_NAME
296 """
297 result = await self.execute_query(db_name, query)
298
299 elif config.db_type == 'sqlite':
300 if table_name:
301 query = f"PRAGMA table_info({table_name})"
302 result = await self.execute_query(db_name, query)
303 else:
304 query = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
305 result = await self.execute_query(db_name, query)
306
307 return result
308
309 except Exception as e:
310 return {
311 "success": False,
312 "error": f"Failed to get schema info: {str(e)}"
313 }
314
315 def get_query_statistics(self) -> Dict[str, Any]:
316 """Get query execution statistics"""
317 return {
318 "total_queries": len(self.query_stats),
319 "statistics": self.query_stats
320 }
🛠️ Advanced Database Tools for MCP Server
Now let’s integrate the database manager into our MCP server:
advanced_server.py:
1#!/usr/bin/env python3
2"""
3Advanced MCP Server with Database Integration
4"""
5
6import asyncio
7import json
8import logging
9from typing import Dict, List, Any, Optional
10from pathlib import Path
11
12from mcp.server.models import InitializeResult
13from mcp.server import Server, NotificationOptions
14from mcp.server.stdio import stdio_server
15from mcp.types import Tool, TextContent
16
17from database_manager import DatabaseManager, DatabaseConfig
18
19logger = logging.getLogger("advanced-mcp-server")
20
21class AdvancedMCPServer:
22 def __init__(self, config_path: str = "advanced_config.json"):
23 self.server = Server("advanced-mcp-server")
24 self.config_path = config_path
25 self.db_manager = None
26 self.config = None
27
28 self._setup_tools()
29
30 async def initialize(self):
31 """Initialize server and database connections"""
32 await self._load_config()
33 await self._setup_database_manager()
34
35 async def _load_config(self):
36 """Load server configuration"""
37 try:
38 with open(self.config_path) as f:
39 self.config = json.load(f)
40 except Exception as e:
41 logger.error(f"Failed to load config: {e}")
42 raise
43
44 async def _setup_database_manager(self):
45 """Setup database manager with configured connections"""
46 if 'databases' not in self.config:
47 logger.warning("No database configurations found")
48 return
49
50 db_configs = {}
51 for name, db_config in self.config['databases'].items():
52 db_configs[name] = DatabaseConfig(**db_config)
53
54 self.db_manager = DatabaseManager(db_configs)
55 await self.db_manager.initialize()
56
57 def _setup_tools(self):
58 """Setup MCP tools"""
59
60 @self.server.list_tools()
61 async def handle_list_tools() -> List[Tool]:
62 return [
63 Tool(
64 name="execute_sql_query",
65 description="Execute SQL query on specified database",
66 inputSchema={
67 "type": "object",
68 "properties": {
69 "database": {
70 "type": "string",
71 "description": "Database connection name"
72 },
73 "query": {
74 "type": "string",
75 "description": "SQL query to execute (SELECT only for security)"
76 },
77 "params": {
78 "type": "array",
79 "description": "Query parameters",
80 "items": {"type": "string"},
81 "default": []
82 },
83 "limit": {
84 "type": "integer",
85 "description": "Maximum number of rows to return",
86 "default": 100,
87 "maximum": 1000
88 }
89 },
90 "required": ["database", "query"]
91 }
92 ),
93 Tool(
94 name="get_database_schema",
95 description="Get database schema information",
96 inputSchema={
97 "type": "object",
98 "properties": {
99 "database": {
100 "type": "string",
101 "description": "Database connection name"
102 },
103 "table": {
104 "type": "string",
105 "description": "Specific table name (optional)"
106 }
107 },
108 "required": ["database"]
109 }
110 ),
111 Tool(
112 name="database_health_check",
113 description="Check database connection health and statistics",
114 inputSchema={
115 "type": "object",
116 "properties": {
117 "database": {
118 "type": "string",
119 "description": "Database connection name (optional - checks all if not specified)"
120 }
121 }
122 }
123 ),
124 Tool(
125 name="query_builder_assistant",
126 description="Help build SQL queries based on natural language description",
127 inputSchema={
128 "type": "object",
129 "properties": {
130 "database": {
131 "type": "string",
132 "description": "Database connection name"
133 },
134 "description": {
135 "type": "string",
136 "description": "Natural language description of what you want to query"
137 },
138 "tables": {
139 "type": "array",
140 "description": "Tables to include in the query",
141 "items": {"type": "string"},
142 "default": []
143 }
144 },
145 "required": ["database", "description"]
146 }
147 ),
148 Tool(
149 name="explain_query_plan",
150 description="Explain SQL query execution plan",
151 inputSchema={
152 "type": "object",
153 "properties": {
154 "database": {
155 "type": "string",
156 "description": "Database connection name"
157 },
158 "query": {
159 "type": "string",
160 "description": "SQL query to explain"
161 }
162 },
163 "required": ["database", "query"]
164 }
165 )
166 ]
167
168 @self.server.call_tool()
169 async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
170 if not self.db_manager:
171 return [TextContent(
172 type="text",
173 text="Database manager not initialized"
174 )]
175
176 try:
177 if name == "execute_sql_query":
178 return await self._handle_sql_query(arguments)
179 elif name == "get_database_schema":
180 return await self._handle_schema_info(arguments)
181 elif name == "database_health_check":
182 return await self._handle_health_check(arguments)
183 elif name == "query_builder_assistant":
184 return await self._handle_query_builder(arguments)
185 elif name == "explain_query_plan":
186 return await self._handle_explain_plan(arguments)
187 else:
188 return [TextContent(
189 type="text",
190 text=f"Unknown tool: {name}"
191 )]
192
193 except Exception as e:
194 logger.error(f"Tool execution error for {name}: {e}")
195 return [TextContent(
196 type="text",
197 text=f"Error executing {name}: {str(e)}"
198 )]
199
200 async def _handle_sql_query(self, arguments: Dict[str, Any]) -> List[TextContent]:
201 """Handle SQL query execution"""
202 database = arguments.get("database")
203 query = arguments.get("query")
204 params = arguments.get("params", [])
205 limit = arguments.get("limit", 100)
206
207 # Add LIMIT clause if not present in SELECT queries
208 if query.strip().lower().startswith("select") and "limit" not in query.lower():
209 query = f"{query.rstrip(';')} LIMIT {limit}"
210
211 result = await self.db_manager.execute_query(
212 database, query, params, fetch_type='all'
213 )
214
215 if result["success"]:
216 data = result["data"]
217 execution_time = result["execution_time_ms"]
218
219 response = {
220 "query": query,
221 "database": database,
222 "execution_time_ms": execution_time,
223 "row_count": len(data) if isinstance(data, list) else 0,
224 "results": data[:limit] if isinstance(data, list) else data
225 }
226
227 return [TextContent(
228 type="text",
229 text=f"SQL Query Results:\n{json.dumps(response, indent=2, default=str)}"
230 )]
231 else:
232 return [TextContent(
233 type="text",
234 text=f"Query failed: {result['error']}"
235 )]
236
237 async def _handle_schema_info(self, arguments: Dict[str, Any]) -> List[TextContent]:
238 """Handle database schema information request"""
239 database = arguments.get("database")
240 table = arguments.get("table")
241
242 result = await self.db_manager.get_schema_info(database, table)
243
244 if result["success"]:
245 return [TextContent(
246 type="text",
247 text=f"Database Schema Information:\n{json.dumps(result['data'], indent=2, default=str)}"
248 )]
249 else:
250 return [TextContent(
251 type="text",
252 text=f"Schema query failed: {result['error']}"
253 )]
254
255 async def _handle_health_check(self, arguments: Dict[str, Any]) -> List[TextContent]:
256 """Handle database health check"""
257 specific_db = arguments.get("database")
258
259 health_info = {
260 "timestamp": datetime.now().isoformat(),
261 "databases": {}
262 }
263
264 databases_to_check = [specific_db] if specific_db else list(self.db_manager.configs.keys())
265
266 for db_name in databases_to_check:
267 try:
268 # Test basic connectivity with simple query
269 result = await self.db_manager.execute_query(
270 db_name, "SELECT 1 as health_check", fetch_type='one'
271 )
272
273 health_info["databases"][db_name] = {
274 "status": "healthy" if result["success"] else "unhealthy",
275 "response_time_ms": result["execution_time_ms"],
276 "error": result.get("error")
277 }
278
279 except Exception as e:
280 health_info["databases"][db_name] = {
281 "status": "unhealthy",
282 "error": str(e)
283 }
284
285 # Add query statistics
286 health_info["query_statistics"] = self.db_manager.get_query_statistics()
287
288 return [TextContent(
289 type="text",
290 text=f"Database Health Check:\n{json.dumps(health_info, indent=2, default=str)}"
291 )]
292
293 async def _handle_query_builder(self, arguments: Dict[str, Any]) -> List[TextContent]:
294 """Handle query builder assistance"""
295 database = arguments.get("database")
296 description = arguments.get("description")
297 tables = arguments.get("tables", [])
298
299 # Get schema information to help build query
300 schema_result = await self.db_manager.get_schema_info(database)
301
302 if not schema_result["success"]:
303 return [TextContent(
304 type="text",
305 text=f"Could not retrieve schema: {schema_result['error']}"
306 )]
307
308 available_tables = schema_result["data"]
309
310 # Simple query building logic based on description
311 # In a production system, you might use NLP or ML for better query generation
312 query_suggestions = self._generate_query_suggestions(description, available_tables, tables)
313
314 response = {
315 "description": description,
316 "database": database,
317 "available_tables": [table.get("table_name", table.get("name")) for table in available_tables],
318 "suggested_queries": query_suggestions,
319 "note": "Review these suggestions and modify as needed. Always test queries in a safe environment."
320 }
321
322 return [TextContent(
323 type="text",
324 text=f"Query Builder Assistant:\n{json.dumps(response, indent=2)}"
325 )]
326
327 async def _handle_explain_plan(self, arguments: Dict[str, Any]) -> List[TextContent]:
328 """Handle query execution plan explanation"""
329 database = arguments.get("database")
330 query = arguments.get("query")
331
332 config = self.db_manager.configs[database]
333
334 # Build EXPLAIN query based on database type
335 if config.db_type == 'postgresql':
336 explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
337 elif config.db_type == 'mysql':
338 explain_query = f"EXPLAIN FORMAT=JSON {query}"
339 elif config.db_type == 'sqlite':
340 explain_query = f"EXPLAIN QUERY PLAN {query}"
341 else:
342 return [TextContent(
343 type="text",
344 text=f"EXPLAIN not supported for database type: {config.db_type}"
345 )]
346
347 result = await self.db_manager.execute_query(
348 database, explain_query, fetch_type='all'
349 )
350
351 if result["success"]:
352 return [TextContent(
353 type="text",
354 text=f"Query Execution Plan:\n{json.dumps(result['data'], indent=2, default=str)}"
355 )]
356 else:
357 return [TextContent(
358 type="text",
359 text=f"EXPLAIN failed: {result['error']}"
360 )]
361
362 def _generate_query_suggestions(self, description: str, available_tables: List[Dict], target_tables: List[str]) -> List[str]:
363 """Generate SQL query suggestions based on description"""
364 suggestions = []
365
366 # Simple keyword-based query generation
367 # In production, you'd use more sophisticated NLP
368 description_lower = description.lower()
369
370 if target_tables:
371 tables_str = ", ".join(target_tables)
372 else:
373 # Use first table as default
374 tables_str = available_tables[0].get("table_name", available_tables[0].get("name", "table_name"))
375
376 if "count" in description_lower:
377 suggestions.append(f"SELECT COUNT(*) FROM {tables_str}")
378
379 if "all" in description_lower or "everything" in description_lower:
380 suggestions.append(f"SELECT * FROM {tables_str} LIMIT 10")
381
382 if "recent" in description_lower or "latest" in description_lower:
383 suggestions.append(f"SELECT * FROM {tables_str} ORDER BY created_at DESC LIMIT 10")
384
385 if "top" in description_lower:
386 suggestions.append(f"SELECT * FROM {tables_str} ORDER BY id DESC LIMIT 10")
387
388 # Default suggestion
389 if not suggestions:
390 suggestions.append(f"SELECT * FROM {tables_str} LIMIT 10")
391
392 return suggestions
393
394 async def run(self):
395 """Run the advanced MCP server"""
396 await self.initialize()
397
398 async with stdio_server() as (read_stream, write_stream):
399 await self.server.run(
400 read_stream,
401 write_stream,
402 InitializeResult(
403 protocolVersion="2024-11-05",
404 capabilities=self.server.get_capabilities(
405 notification_options=NotificationOptions(),
406 experimental_capabilities={}
407 )
408 )
409 )
410
411 async def cleanup(self):
412 """Cleanup resources"""
413 if self.db_manager:
414 await self.db_manager.close()
🌐 REST API Integration & Testing Tools
🔌 Advanced API Connector System
api_connector.py:
1#!/usr/bin/env python3
2"""
3Advanced API Connector for MCP Server
4Supports REST API integration, testing, and monitoring
5"""
6
7import aiohttp
8import asyncio
9import json
10import time
11import hashlib
12from typing import Dict, List, Any, Optional, Union
13from dataclasses import dataclass, asdict
14from datetime import datetime, timedelta
15import logging
16from urllib.parse import urljoin, urlparse
17
18logger = logging.getLogger(__name__)
19
20@dataclass
21class APIEndpoint:
22 """API endpoint configuration"""
23 name: str
24 base_url: str
25 endpoints: Dict[str, str]
26 auth_type: Optional[str] = None # 'bearer', 'basic', 'api_key', 'oauth2'
27 auth_config: Optional[Dict[str, Any]] = None
28 headers: Optional[Dict[str, str]] = None
29 timeout: int = 30
30 retry_count: int = 3
31 rate_limit: Optional[int] = None # requests per minute
32
33@dataclass
34class APIRequest:
35 """API request configuration"""
36 method: str # GET, POST, PUT, DELETE, PATCH
37 url: str
38 headers: Optional[Dict[str, str]] = None
39 params: Optional[Dict[str, Any]] = None
40 json_data: Optional[Dict[str, Any]] = None
41 form_data: Optional[Dict[str, Any]] = None
42
43@dataclass
44class APIResponse:
45 """API response data"""
46 status_code: int
47 headers: Dict[str, str]
48 content: Union[Dict, List, str]
49 response_time: float
50 request_url: str
51 success: bool
52 error: Optional[str] = None
53
54class APIConnector:
55 """Advanced API connector with testing and monitoring capabilities"""
56
57 def __init__(self, endpoints: Dict[str, APIEndpoint]):
58 self.endpoints = endpoints
59 self.request_history = []
60 self.performance_stats = {}
61 self.rate_limiters = {}
62 self.session = None
63
64 async def initialize(self):
65 """Initialize HTTP session and rate limiters"""
66 timeout = aiohttp.ClientTimeout(total=60)
67 self.session = aiohttp.ClientSession(timeout=timeout)
68
69 # Initialize rate limiters
70 for endpoint_name, endpoint in self.endpoints.items():
71 if endpoint.rate_limit:
72 self.rate_limiters[endpoint_name] = {
73 'requests': [],
74 'limit': endpoint.rate_limit
75 }
76
77 async def close(self):
78 """Close HTTP session"""
79 if self.session:
80 await self.session.close()
81
82 async def make_request(
83 self,
84 endpoint_name: str,
85 request: APIRequest,
86 cache_response: bool = False
87 ) -> APIResponse:
88 """Make API request with authentication, retries, and monitoring"""
89
90 if endpoint_name not in self.endpoints:
91 raise ValueError(f"Endpoint {endpoint_name} not configured")
92
93 endpoint = self.endpoints[endpoint_name]
94
95 # Check rate limiting
96 if not await self._check_rate_limit(endpoint_name):
97 return APIResponse(
98 status_code=429,
99 headers={},
100 content={"error": "Rate limit exceeded"},
101 response_time=0,
102 request_url=request.url,
103 success=False,
104 error="Rate limit exceeded"
105 )
106
107 # Prepare request
108 full_url = urljoin(endpoint.base_url, request.url)
109 headers = self._prepare_headers(endpoint, request.headers)
110
111 start_time = time.time()
112 last_error = None
113
114 # Retry logic
115 for attempt in range(endpoint.retry_count):
116 try:
117 async with self.session.request(
118 method=request.method,
119 url=full_url,
120 headers=headers,
121 params=request.params,
122 json=request.json_data,
123 data=request.form_data,
124 timeout=aiohttp.ClientTimeout(total=endpoint.timeout)
125 ) as response:
126
127 response_time = time.time() - start_time
128
129 # Parse response content
130 try:
131 if response.content_type == 'application/json':
132 content = await response.json()
133 else:
134 content = await response.text()
135 except Exception:
136 content = await response.text()
137
138 api_response = APIResponse(
139 status_code=response.status,
140 headers=dict(response.headers),
141 content=content,
142 response_time=response_time,
143 request_url=full_url,
144 success=200 <= response.status < 300
145 )
146
147 # Record request for history and stats
148 self._record_request(endpoint_name, request, api_response)
149
150 return api_response
151
152 except asyncio.TimeoutError:
153 last_error = f"Timeout after {endpoint.timeout}s"
154 if attempt < endpoint.retry_count - 1:
155 await asyncio.sleep(2 ** attempt) # Exponential backoff
156
157 except aiohttp.ClientError as e:
158 last_error = f"Client error: {str(e)}"
159 if attempt < endpoint.retry_count - 1:
160 await asyncio.sleep(2 ** attempt)
161
162 except Exception as e:
163 last_error = f"Unexpected error: {str(e)}"
164 break
165
166 # All retries failed
167 response_time = time.time() - start_time
168 error_response = APIResponse(
169 status_code=0,
170 headers={},
171 content={"error": last_error},
172 response_time=response_time,
173 request_url=full_url,
174 success=False,
175 error=last_error
176 )
177
178 self._record_request(endpoint_name, request, error_response)
179 return error_response
180
181 async def _check_rate_limit(self, endpoint_name: str) -> bool:
182 """Check if request is within rate limits"""
183 if endpoint_name not in self.rate_limiters:
184 return True
185
186 limiter = self.rate_limiters[endpoint_name]
187 now = time.time()
188 minute_ago = now - 60
189
190 # Clean old requests
191 limiter['requests'] = [req_time for req_time in limiter['requests'] if req_time > minute_ago]
192
193 # Check limit
194 if len(limiter['requests']) >= limiter['limit']:
195 return False
196
197 # Record this request
198 limiter['requests'].append(now)
199 return True
200
201 def _prepare_headers(self, endpoint: APIEndpoint, request_headers: Optional[Dict[str, str]]) -> Dict[str, str]:
202 """Prepare headers with authentication"""
203 headers = {}
204
205 # Add endpoint default headers
206 if endpoint.headers:
207 headers.update(endpoint.headers)
208
209 # Add request specific headers
210 if request_headers:
211 headers.update(request_headers)
212
213 # Add authentication
214 if endpoint.auth_type and endpoint.auth_config:
215 auth_headers = self._get_auth_headers(endpoint.auth_type, endpoint.auth_config)
216 headers.update(auth_headers)
217
218 return headers
219
220 def _get_auth_headers(self, auth_type: str, auth_config: Dict[str, Any]) -> Dict[str, str]:
221 """Generate authentication headers"""
222 if auth_type == 'bearer':
223 return {'Authorization': f"Bearer {auth_config['token']}"}
224
225 elif auth_type == 'api_key':
226 key_name = auth_config.get('key_name', 'X-API-Key')
227 return {key_name: auth_config['key']}
228
229 elif auth_type == 'basic':
230 import base64
231 credentials = f"{auth_config['username']}:{auth_config['password']}"
232 encoded = base64.b64encode(credentials.encode()).decode()
233 return {'Authorization': f"Basic {encoded}"}
234
235 return {}
236
237 def _record_request(self, endpoint_name: str, request: APIRequest, response: APIResponse):
238 """Record request for history and performance tracking"""
239 record = {
240 'timestamp': datetime.now().isoformat(),
241 'endpoint_name': endpoint_name,
242 'method': request.method,
243 'url': response.request_url,
244 'status_code': response.status_code,
245 'response_time': response.response_time,
246 'success': response.success,
247 'error': response.error
248 }
249
250 self.request_history.append(record)
251
252 # Keep only last 1000 requests
253 if len(self.request_history) > 1000:
254 self.request_history = self.request_history[-1000:]
255
256 # Update performance stats
257 if endpoint_name not in self.performance_stats:
258 self.performance_stats[endpoint_name] = {
259 'total_requests': 0,
260 'successful_requests': 0,
261 'failed_requests': 0,
262 'avg_response_time': 0,
263 'total_response_time': 0
264 }
265
266 stats = self.performance_stats[endpoint_name]
267 stats['total_requests'] += 1
268 stats['total_response_time'] += response.response_time
269 stats['avg_response_time'] = stats['total_response_time'] / stats['total_requests']
270
271 if response.success:
272 stats['successful_requests'] += 1
273 else:
274 stats['failed_requests'] += 1
275
276 async def test_endpoint_health(self, endpoint_name: str) -> Dict[str, Any]:
277 """Test endpoint health and performance"""
278 if endpoint_name not in self.endpoints:
279 return {"error": f"Endpoint {endpoint_name} not configured"}
280
281 endpoint = self.endpoints[endpoint_name]
282 health_results = []
283
284 # Test basic connectivity
285 test_request = APIRequest(method="GET", url="/")
286
287 for i in range(3): # Run 3 test requests
288 start_time = time.time()
289 response = await self.make_request(endpoint_name, test_request)
290
291 health_results.append({
292 'attempt': i + 1,
293 'success': response.success,
294 'status_code': response.status_code,
295 'response_time': response.response_time,
296 'error': response.error
297 })
298
299 # Calculate health metrics
300 successful_attempts = sum(1 for result in health_results if result['success'])
301 avg_response_time = sum(result['response_time'] for result in health_results) / len(health_results)
302
303 return {
304 'endpoint_name': endpoint_name,
305 'base_url': endpoint.base_url,
306 'health_status': 'healthy' if successful_attempts >= 2 else 'unhealthy',
307 'success_rate': successful_attempts / len(health_results),
308 'avg_response_time': avg_response_time,
309 'test_results': health_results,
310 'timestamp': datetime.now().isoformat()
311 }
312
313 def get_performance_stats(self) -> Dict[str, Any]:
314 """Get performance statistics for all endpoints"""
315 return {
316 'endpoints': self.performance_stats,
317 'total_requests': len(self.request_history),
318 'recent_requests': self.request_history[-10:] if self.request_history else []
319 }
320
321 async def run_api_test_suite(self, test_config: Dict[str, Any]) -> Dict[str, Any]:
322 """Run comprehensive API test suite"""
323 results = {
324 'test_suite': test_config.get('name', 'API Test Suite'),
325 'start_time': datetime.now().isoformat(),
326 'tests': [],
327 'summary': {
328 'total_tests': 0,
329 'passed': 0,
330 'failed': 0,
331 'success_rate': 0
332 }
333 }
334
335 for test_case in test_config.get('test_cases', []):
336 test_result = await self._run_single_test(test_case)
337 results['tests'].append(test_result)
338
339 results['summary']['total_tests'] += 1
340 if test_result['passed']:
341 results['summary']['passed'] += 1
342 else:
343 results['summary']['failed'] += 1
344
345 if results['summary']['total_tests'] > 0:
346 results['summary']['success_rate'] = results['summary']['passed'] / results['summary']['total_tests']
347
348 results['end_time'] = datetime.now().isoformat()
349 return results
350
351 async def _run_single_test(self, test_case: Dict[str, Any]) -> Dict[str, Any]:
352 """Run a single API test case"""
353 test_name = test_case.get('name', 'Unnamed Test')
354 endpoint_name = test_case['endpoint']
355
356 request = APIRequest(
357 method=test_case.get('method', 'GET'),
358 url=test_case.get('path', '/'),
359 headers=test_case.get('headers'),
360 params=test_case.get('params'),
361 json_data=test_case.get('json_data'),
362 form_data=test_case.get('form_data')
363 )
364
365 try:
366 response = await self.make_request(endpoint_name, request)
367
368 # Check assertions
369 assertions = test_case.get('assertions', {})
370 assertion_results = []
371
372 if 'status_code' in assertions:
373 expected_status = assertions['status_code']
374 assertion_results.append({
375 'assertion': f'status_code == {expected_status}',
376 'passed': response.status_code == expected_status,
377 'actual': response.status_code,
378 'expected': expected_status
379 })
380
381 if 'response_time_max' in assertions:
382 max_time = assertions['response_time_max']
383 assertion_results.append({
384 'assertion': f'response_time <= {max_time}',
385 'passed': response.response_time <= max_time,
386 'actual': response.response_time,
387 'expected': f'<= {max_time}'
388 })
389
390 if 'contains' in assertions:
391 contains_text = assertions['contains']
392 content_str = json.dumps(response.content) if isinstance(response.content, (dict, list)) else str(response.content)
393 assertion_results.append({
394 'assertion': f'response contains "{contains_text}"',
395 'passed': contains_text in content_str,
396 'actual': content_str[:200] + '...' if len(content_str) > 200 else content_str
397 })
398
399 all_passed = all(result['passed'] for result in assertion_results)
400
401 return {
402 'test_name': test_name,
403 'endpoint': endpoint_name,
404 'method': request.method,
405 'url': request.url,
406 'passed': all_passed and response.success,
407 'response_time': response.response_time,
408 'status_code': response.status_code,
409 'assertions': assertion_results,
410 'error': response.error
411 }
412
413 except Exception as e:
414 return {
415 'test_name': test_name,
416 'endpoint': endpoint_name,
417 'passed': False,
418 'error': str(e)
419 }
🧪 API Testing Tools Integration
Add API testing tools to the MCP server:
Extended server tools for API testing:
1# Add these tools to your AdvancedMCPServer class
2
3Tool(
4 name="make_api_request",
5 description="Make HTTP request to configured API endpoint",
6 inputSchema={
7 "type": "object",
8 "properties": {
9 "endpoint": {
10 "type": "string",
11 "description": "API endpoint name from configuration"
12 },
13 "method": {
14 "type": "string",
15 "description": "HTTP method",
16 "enum": ["GET", "POST", "PUT", "DELETE", "PATCH"],
17 "default": "GET"
18 },
19 "path": {
20 "type": "string",
21 "description": "API path (relative to base URL)"
22 },
23 "params": {
24 "type": "object",
25 "description": "Query parameters"
26 },
27 "json_data": {
28 "type": "object",
29 "description": "JSON request body"
30 },
31 "headers": {
32 "type": "object",
33 "description": "Additional headers"
34 }
35 },
36 "required": ["endpoint", "path"]
37 }
38),
39Tool(
40 name="test_api_endpoint",
41 description="Run health check on API endpoint",
42 inputSchema={
43 "type": "object",
44 "properties": {
45 "endpoint": {
46 "type": "string",
47 "description": "API endpoint name to test"
48 }
49 },
50 "required": ["endpoint"]
51 }
52),
53Tool(
54 name="run_api_test_suite",
55 description="Execute comprehensive API test suite",
56 inputSchema={
57 "type": "object",
58 "properties": {
59 "test_suite_name": {
60 "type": "string",
61 "description": "Name of test suite to run"
62 },
63 "endpoints": {
64 "type": "array",
65 "description": "Specific endpoints to test",
66 "items": {"type": "string"}
67 }
68 },
69 "required": ["test_suite_name"]
70 }
71),
72Tool(
73 name="api_performance_stats",
74 description="Get API performance statistics and monitoring data",
75 inputSchema={
76 "type": "object",
77 "properties": {
78 "endpoint": {
79 "type": "string",
80 "description": "Specific endpoint name (optional)"
81 },
82 "time_range": {
83 "type": "string",
84 "description": "Time range for stats",
85 "enum": ["1h", "6h", "24h", "7d"],
86 "default": "1h"
87 }
88 }
89 }
90)
🐳 Production Deployment with Docker
🏭 Docker Configuration
Dockerfile:
1# Multi-stage build for production MCP server
2FROM python:3.11-slim as builder
3
4# Set environment variables
5ENV PYTHONDONTWRITEBYTECODE=1
6ENV PYTHONUNBUFFERED=1
7ENV PIP_DISABLE_PIP_VERSION_CHECK=1
8
9# Install build dependencies
10RUN apt-get update && apt-get install -y \
11 gcc \
12 g++ \
13 libpq-dev \
14 && rm -rf /var/lib/apt/lists/*
15
16# Create virtual environment
17RUN python -m venv /opt/venv
18ENV PATH="/opt/venv/bin:$PATH"
19
20# Install Python dependencies
21COPY requirements.txt .
22RUN pip install --no-cache-dir -r requirements.txt
23
24# Production stage
25FROM python:3.11-slim
26
27# Install runtime dependencies
28RUN apt-get update && apt-get install -y \
29 libpq5 \
30 curl \
31 && rm -rf /var/lib/apt/lists/*
32
33# Copy virtual environment from builder
34COPY --from=builder /opt/venv /opt/venv
35ENV PATH="/opt/venv/bin:$PATH"
36
37# Create non-root user
38RUN groupadd -r mcpuser && useradd -r -g mcpuser mcpuser
39
40# Create application directory
41WORKDIR /app
42
43# Copy application code
44COPY --chown=mcpuser:mcpuser . .
45
46# Create logs directory
47RUN mkdir -p /app/logs && chown mcpuser:mcpuser /app/logs
48
49# Switch to non-root user
50USER mcpuser
51
52# Health check
53HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
54 CMD python -c "import asyncio; import sys; sys.exit(0)"
55
56# Expose port (if needed for monitoring)
57EXPOSE 8080
58
59# Run the MCP server
60CMD ["python", "advanced_server.py"]
docker-compose.yml:
1version: '3.8'
2
3services:
4 # PostgreSQL Database
5 postgres:
6 image: postgres:15-alpine
7 container_name: mcp-postgres
8 environment:
9 POSTGRES_DB: ${POSTGRES_DB:-mcp_database}
10 POSTGRES_USER: ${POSTGRES_USER:-mcp_user}
11 POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-mcp_password}
12 volumes:
13 - postgres_data:/var/lib/postgresql/data
14 - ./init-scripts:/docker-entrypoint-initdb.d
15 ports:
16 - "${POSTGRES_PORT:-5432}:5432"
17 networks:
18 - mcp-network
19 restart: unless-stopped
20 healthcheck:
21 test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-mcp_user}"]
22 interval: 30s
23 timeout: 10s
24 retries: 3
25
26 # Redis Cache
27 redis:
28 image: redis:7-alpine
29 container_name: mcp-redis
30 command: redis-server --appendonly yes --requirepass ${REDIS_PASSWORD:-redis_password}
31 volumes:
32 - redis_data:/data
33 ports:
34 - "${REDIS_PORT:-6379}:6379"
35 networks:
36 - mcp-network
37 restart: unless-stopped
38 healthcheck:
39 test: ["CMD", "redis-cli", "--raw", "incr", "ping"]
40 interval: 30s
41 timeout: 10s
42 retries: 3
43
44 # MCP Server
45 mcp-server:
46 build: .
47 container_name: mcp-advanced-server
48 environment:
49 - POSTGRES_HOST=postgres
50 - POSTGRES_PORT=5432
51 - POSTGRES_DB=${POSTGRES_DB:-mcp_database}
52 - POSTGRES_USER=${POSTGRES_USER:-mcp_user}
53 - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-mcp_password}
54 - REDIS_HOST=redis
55 - REDIS_PORT=6379
56 - REDIS_PASSWORD=${REDIS_PASSWORD:-redis_password}
57 - LOG_LEVEL=${LOG_LEVEL:-INFO}
58 - MCP_SERVER_ENV=production
59 volumes:
60 - ./config:/app/config:ro
61 - ./logs:/app/logs
62 - mcp_data:/app/data
63 depends_on:
64 postgres:
65 condition: service_healthy
66 redis:
67 condition: service_healthy
68 networks:
69 - mcp-network
70 restart: unless-stopped
71 healthcheck:
72 test: ["CMD", "python", "health_check.py"]
73 interval: 30s
74 timeout: 10s
75 retries: 3
76
77 # Monitoring with Prometheus
78 prometheus:
79 image: prom/prometheus:latest
80 container_name: mcp-prometheus
81 ports:
82 - "9090:9090"
83 volumes:
84 - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
85 - prometheus_data:/prometheus
86 command:
87 - '--config.file=/etc/prometheus/prometheus.yml'
88 - '--storage.tsdb.path=/prometheus'
89 - '--web.console.libraries=/etc/prometheus/console_libraries'
90 - '--web.console.templates=/etc/prometheus/consoles'
91 networks:
92 - mcp-network
93 restart: unless-stopped
94
95 # Grafana Dashboard
96 grafana:
97 image: grafana/grafana:latest
98 container_name: mcp-grafana
99 ports:
100 - "3000:3000"
101 environment:
102 - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD:-admin123}
103 volumes:
104 - grafana_data:/var/lib/grafana
105 - ./monitoring/grafana/provisioning:/etc/grafana/provisioning:ro
106 networks:
107 - mcp-network
108 restart: unless-stopped
109
110volumes:
111 postgres_data:
112 redis_data:
113 mcp_data:
114 prometheus_data:
115 grafana_data:
116
117networks:
118 mcp-network:
119 driver: bridge
📊 Monitoring & Observability
monitoring/prometheus.yml:
1global:
2 scrape_interval: 15s
3 evaluation_interval: 15s
4
5rule_files:
6 - "rules/*.yml"
7
8scrape_configs:
9 - job_name: 'mcp-server'
10 static_configs:
11 - targets: ['mcp-server:8080']
12 metrics_path: /metrics
13 scrape_interval: 10s
14
15 - job_name: 'postgresql'
16 static_configs:
17 - targets: ['postgres:5432']
18
19 - job_name: 'redis'
20 static_configs:
21 - targets: ['redis:6379']
22
23alerting:
24 alertmanagers:
25 - static_configs:
26 - targets:
27 - alertmanager:9093
health_check.py:
1#!/usr/bin/env python3
2"""
3Health check script for MCP server container
4"""
5
6import asyncio
7import sys
8import json
9from pathlib import Path
10
11async def check_health():
12 """Perform health checks"""
13 try:
14 # Check if server configuration exists
15 config_path = Path("advanced_config.json")
16 if not config_path.exists():
17 print("❌ Configuration file missing")
18 return False
19
20 # Try to load configuration
21 with open(config_path) as f:
22 config = json.load(f)
23
24 print("✅ Configuration loaded successfully")
25
26 # Check database connectivity
27 # Note: In a real implementation, you'd test actual DB connections
28 if 'databases' in config:
29 print(f"✅ {len(config['databases'])} database(s) configured")
30
31 # Check API endpoints
32 if 'api_endpoints' in config:
33 print(f"✅ {len(config['api_endpoints'])} API endpoint(s) configured")
34
35 print("✅ Health check passed")
36 return True
37
38 except Exception as e:
39 print(f"❌ Health check failed: {e}")
40 return False
41
42if __name__ == "__main__":
43 result = asyncio.run(check_health())
44 sys.exit(0 if result else 1)
🔐 Security & Authentication
🛡️ Security Configuration
security_manager.py:
1#!/usr/bin/env python3
2"""
3Security manager for MCP server
4Handles authentication, authorization, and audit logging
5"""
6
7import jwt
8import hashlib
9import hmac
10import time
11import logging
12from typing import Dict, List, Any, Optional
13from datetime import datetime, timedelta
14from dataclasses import dataclass
15import json
16
17logger = logging.getLogger(__name__)
18
19@dataclass
20class User:
21 """User information"""
22 username: str
23 roles: List[str]
24 permissions: List[str]
25 api_key: Optional[str] = None
26 expires_at: Optional[datetime] = None
27
28@dataclass
29class AuditEvent:
30 """Audit event information"""
31 timestamp: datetime
32 user: str
33 action: str
34 resource: str
35 details: Dict[str, Any]
36 success: bool
37 ip_address: Optional[str] = None
38
39class SecurityManager:
40 """Advanced security manager for MCP server"""
41
42 def __init__(self, config: Dict[str, Any]):
43 self.config = config
44 self.jwt_secret = config.get('jwt_secret', 'default-secret-change-me')
45 self.jwt_algorithm = config.get('jwt_algorithm', 'HS256')
46 self.jwt_expiration = config.get('jwt_expiration', 3600) # 1 hour
47
48 self.users = {} # In production, use database
49 self.audit_events = []
50 self.rate_limits = {}
51
52 self._load_users()
53
54 def _load_users(self):
55 """Load users from configuration"""
56 for user_config in self.config.get('users', []):
57 user = User(
58 username=user_config['username'],
59 roles=user_config.get('roles', []),
60 permissions=user_config.get('permissions', []),
61 api_key=user_config.get('api_key')
62 )
63 self.users[user.username] = user
64
65 def create_jwt_token(self, username: str) -> str:
66 """Create JWT token for user"""
67 if username not in self.users:
68 raise ValueError(f"User {username} not found")
69
70 user = self.users[username]
71
72 payload = {
73 'username': username,
74 'roles': user.roles,
75 'permissions': user.permissions,
76 'iat': time.time(),
77 'exp': time.time() + self.jwt_expiration
78 }
79
80 return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)
81
82 def verify_jwt_token(self, token: str) -> Optional[Dict[str, Any]]:
83 """Verify JWT token and return payload"""
84 try:
85 payload = jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
86 return payload
87 except jwt.ExpiredSignatureError:
88 logger.warning("JWT token expired")
89 return None
90 except jwt.InvalidTokenError:
91 logger.warning("Invalid JWT token")
92 return None
93
94 def verify_api_key(self, api_key: str) -> Optional[User]:
95 """Verify API key and return user"""
96 for user in self.users.values():
97 if user.api_key and hmac.compare_digest(user.api_key, api_key):
98 return user
99 return None
100
101 def check_permission(self, user: User, action: str, resource: str) -> bool:
102 """Check if user has permission for action on resource"""
103 # Admin role has all permissions
104 if 'admin' in user.roles:
105 return True
106
107 # Check specific permissions
108 required_permission = f"{action}:{resource}"
109 if required_permission in user.permissions:
110 return True
111
112 # Check wildcard permissions
113 wildcard_permission = f"{action}:*"
114 if wildcard_permission in user.permissions:
115 return True
116
117 return False
118
119 def check_rate_limit(self, user: User, action: str, limit: int, window: int = 60) -> bool:
120 """Check rate limit for user action"""
121 key = f"{user.username}:{action}"
122 now = time.time()
123
124 if key not in self.rate_limits:
125 self.rate_limits[key] = []
126
127 # Clean old entries
128 self.rate_limits[key] = [
129 timestamp for timestamp in self.rate_limits[key]
130 if timestamp > now - window
131 ]
132
133 # Check limit
134 if len(self.rate_limits[key]) >= limit:
135 return False
136
137 # Record this request
138 self.rate_limits[key].append(now)
139 return True
140
141 def log_audit_event(
142 self,
143 user: str,
144 action: str,
145 resource: str,
146 details: Dict[str, Any],
147 success: bool,
148 ip_address: Optional[str] = None
149 ):
150 """Log audit event"""
151 event = AuditEvent(
152 timestamp=datetime.now(),
153 user=user,
154 action=action,
155 resource=resource,
156 details=details,
157 success=success,
158 ip_address=ip_address
159 )
160
161 self.audit_events.append(event)
162
163 # Keep only last 10000 events in memory
164 if len(self.audit_events) > 10000:
165 self.audit_events = self.audit_events[-10000:]
166
167 # Log to file/database
168 logger.info(f"AUDIT: {user} {action} {resource} {'SUCCESS' if success else 'FAILED'}")
169
170 def get_audit_events(
171 self,
172 user: Optional[str] = None,
173 action: Optional[str] = None,
174 start_time: Optional[datetime] = None,
175 end_time: Optional[datetime] = None,
176 limit: int = 100
177 ) -> List[Dict[str, Any]]:
178 """Get audit events with filters"""
179 filtered_events = []
180
181 for event in self.audit_events:
182 # Apply filters
183 if user and event.user != user:
184 continue
185 if action and event.action != action:
186 continue
187 if start_time and event.timestamp < start_time:
188 continue
189 if end_time and event.timestamp > end_time:
190 continue
191
192 filtered_events.append({
193 'timestamp': event.timestamp.isoformat(),
194 'user': event.user,
195 'action': event.action,
196 'resource': event.resource,
197 'details': event.details,
198 'success': event.success,
199 'ip_address': event.ip_address
200 })
201
202 # Sort by timestamp descending and limit
203 filtered_events.sort(key=lambda x: x['timestamp'], reverse=True)
204 return filtered_events[:limit]
205
206 def get_security_stats(self) -> Dict[str, Any]:
207 """Get security statistics"""
208 total_events = len(self.audit_events)
209 successful_events = sum(1 for event in self.audit_events if event.success)
210 failed_events = total_events - successful_events
211
212 # Count events by action
213 action_counts = {}
214 for event in self.audit_events:
215 action_counts[event.action] = action_counts.get(event.action, 0) + 1
216
217 # Count events by user
218 user_counts = {}
219 for event in self.audit_events:
220 user_counts[event.user] = user_counts.get(event.user, 0) + 1
221
222 return {
223 'total_events': total_events,
224 'successful_events': successful_events,
225 'failed_events': failed_events,
226 'success_rate': successful_events / total_events if total_events > 0 else 0,
227 'events_by_action': action_counts,
228 'events_by_user': user_counts,
229 'active_users': len(self.users),
230 'rate_limit_keys': len(self.rate_limits)
231 }
🎯 Complete Advanced Configuration
advanced_config.json:
1{
2 "server": {
3 "name": "advanced-mcp-server",
4 "version": "2.0.0",
5 "description": "Advanced MCP server with database and API integration",
6 "environment": "production"
7 },
8 "databases": {
9 "primary_db": {
10 "db_type": "postgresql",
11 "host": "localhost",
12 "port": 5432,
13 "database": "mcp_database",
14 "username": "mcp_user",
15 "password": "mcp_password",
16 "pool_size": 20,
17 "timeout": 30
18 },
19 "analytics_db": {
20 "db_type": "mysql",
21 "host": "localhost",
22 "port": 3306,
23 "database": "analytics",
24 "username": "analytics_user",
25 "password": "analytics_password",
26 "pool_size": 10
27 },
28 "cache_db": {
29 "db_type": "sqlite",
30 "file_path": "./data/cache.db"
31 }
32 },
33 "api_endpoints": {
34 "github_api": {
35 "name": "GitHub API",
36 "base_url": "https://api.github.com",
37 "endpoints": {
38 "repos": "/repos/{owner}/{repo}",
39 "issues": "/repos/{owner}/{repo}/issues",
40 "users": "/users/{username}"
41 },
42 "auth_type": "bearer",
43 "auth_config": {
44 "token": "${GITHUB_TOKEN}"
45 },
46 "headers": {
47 "Accept": "application/vnd.github.v3+json",
48 "User-Agent": "MCP-Server/2.0"
49 },
50 "rate_limit": 5000,
51 "timeout": 30
52 },
53 "internal_api": {
54 "name": "Internal API",
55 "base_url": "https://internal.company.com/api",
56 "endpoints": {
57 "users": "/users",
58 "projects": "/projects",
59 "metrics": "/metrics"
60 },
61 "auth_type": "api_key",
62 "auth_config": {
63 "key_name": "X-API-Key",
64 "key": "${INTERNAL_API_KEY}"
65 },
66 "rate_limit": 1000
67 }
68 },
69 "security": {
70 "jwt_secret": "${JWT_SECRET}",
71 "jwt_algorithm": "HS256",
72 "jwt_expiration": 3600,
73 "users": [
74 {
75 "username": "admin",
76 "roles": ["admin"],
77 "permissions": ["*:*"],
78 "api_key": "${ADMIN_API_KEY}"
79 },
80 {
81 "username": "developer",
82 "roles": ["developer"],
83 "permissions": [
84 "query:database",
85 "read:schema",
86 "request:api",
87 "test:endpoints"
88 ],
89 "api_key": "${DEVELOPER_API_KEY}"
90 }
91 ],
92 "rate_limits": {
93 "default": 100,
94 "database_query": 50,
95 "api_request": 200
96 }
97 },
98 "logging": {
99 "level": "INFO",
100 "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
101 "file": "logs/advanced-mcp-server.log",
102 "max_size": "10MB",
103 "backup_count": 5
104 },
105 "monitoring": {
106 "metrics_enabled": true,
107 "metrics_port": 8080,
108 "health_check_enabled": true,
109 "performance_tracking": true
110 }
111}
🎉 Conclusion & Next Steps
✅ What We’ve Accomplished
In Part 2, we’ve built a production-ready MCP server with:
🗄️ Database Integration:
- Multi-database support (PostgreSQL, MySQL, SQLite)
- Connection pooling and performance monitoring
- Safe query execution with security validation
- Schema introspection and query building assistance
🌐 API Integration:
- REST API connector with authentication
- Rate limiting and retry mechanisms
- Comprehensive API testing suite
- Performance monitoring and health checks
🐳 Production Deployment:
- Docker containerization with multi-stage builds
- Docker Compose for full stack deployment
- Health checks and monitoring integration
- Prometheus and Grafana observability
🔐 Security Features:
- JWT token authentication
- API key management
- Role-based access control
- Audit logging and security monitoring
🚀 Claude Code Integration
Your Claude Code can now:
Database Operations:
"Query the user table in our production database and show the top 10 most active users"
"Get the schema information for the orders table"
"Explain the execution plan for this complex query"
API Testing:
"Test the health of our GitHub API integration"
"Run a comprehensive test suite on our internal API endpoints"
"Make a request to create a new repository via the GitHub API"
Monitoring & Analytics:
"Show me the performance statistics for all our API endpoints"
"Get the audit log for database operations in the last hour"
"Check the health status of all connected systems"
🔮 Part 3 Preview
In the upcoming Part 3, we’ll cover:
Advanced Topics:
- Real-time WebSocket connections for live data streams
- Message queue integration (Redis, RabbitMQ, Apache Kafka)
- Machine learning model integration for AI-powered tools
- Custom plugin architecture for extensible functionality
Enterprise Features:
- Multi-tenant architecture for SaaS deployments
- Advanced caching strategies with Redis clustering
- Distributed tracing with OpenTelemetry
- High availability and disaster recovery patterns
Advanced Security:
- OAuth2 and SAML integration for enterprise SSO
- Certificate-based authentication for service-to-service communication
- Data encryption at rest and in transit
- Compliance frameworks (SOC2, GDPR, HIPAA)
Stay tuned for the final part of this comprehensive MCP server development series!
🔗 Resources & References
Resource | Link |
---|---|
📂 Part 1 | Basic MCP Server Setup |
🐳 Docker Hub | Official PostgreSQL Image |
📖 API Documentation | MCP Protocol Specification |
🛠️ Example Repository | Advanced MCP Server Examples |