Advanced MCP Server Development with Database Integration - Part 2

🎯 Advanced MCP Server Development

📋 Building on Part 1 Foundation

In Part 1, we built a basic MCP server with utility tools. Now we’ll create production-ready servers with:

  • Database Integration: PostgreSQL, MySQL, SQLite connections with query tools
  • API Connectors: REST API integration and testing capabilities
  • Real-time Processing: WebSocket connections and live data streams
  • Production Deployment: Docker containers, monitoring, and scaling
  • Security: Authentication, authorization, and audit logging

🏗️ Advanced MCP Architecture

graph TD
    A[Claude Code] --> B[MCP Client]
    B --> C[Advanced MCP Server]

    C --> D[Database Manager]
    C --> E[API Connector Hub]
    C --> F[Real-time Processor]
    C --> G[Security Layer]

    D --> H[PostgreSQL Pool]
    D --> I[MySQL Pool]
    D --> J[SQLite Handler]
    D --> K[Query Executor]

    E --> L[REST Client]
    E --> M[GraphQL Client]
    E --> N[WebSocket Client]
    E --> O[Auth Manager]

    F --> P[Event Streams]
    F --> Q[Data Pipelines]
    F --> R[Cache Layer]

    G --> S[JWT Validation]
    G --> T[Rate Limiting]
    G --> U[Audit Logging]
    G --> V[Permission Engine]

    style C fill:#34a853
    style D fill:#4285f4
    style E fill:#ea4335
    style F fill:#fbbc04
    style G fill:#ff6d01

🗄️ Database Integration & Query Tools

📊 Multi-Database Connection Manager

Let’s create a robust database integration system that supports multiple database types:

database_manager.py:

  1#!/usr/bin/env python3
  2"""
  3Advanced Database Manager for MCP Server
  4Supports PostgreSQL, MySQL, SQLite with connection pooling
  5"""
  6
  7import asyncio
  8import asyncpg
  9import aiomysql
 10import aiosqlite
 11import logging
 12from typing import Dict, List, Any, Optional, Union
 13from dataclasses import dataclass
 14from contextlib import asynccontextmanager
 15import json
 16from datetime import datetime
 17import hashlib
 18
 19logger = logging.getLogger(__name__)
 20
 21@dataclass
 22class DatabaseConfig:
 23    """Database connection configuration"""
 24    db_type: str  # 'postgresql', 'mysql', 'sqlite'
 25    host: Optional[str] = None
 26    port: Optional[int] = None
 27    database: str = None
 28    username: Optional[str] = None
 29    password: Optional[str] = None
 30    file_path: Optional[str] = None  # For SQLite
 31    pool_size: int = 10
 32    max_overflow: int = 20
 33    timeout: int = 30
 34
 35class DatabaseManager:
 36    """Advanced database connection manager with pooling"""
 37
 38    def __init__(self, configs: Dict[str, DatabaseConfig]):
 39        self.configs = configs
 40        self.pools = {}
 41        self.query_cache = {}
 42        self.query_stats = {}
 43
 44    async def initialize(self):
 45        """Initialize database connections and pools"""
 46        for name, config in self.configs.items():
 47            try:
 48                if config.db_type == 'postgresql':
 49                    self.pools[name] = await asyncpg.create_pool(
 50                        host=config.host,
 51                        port=config.port or 5432,
 52                        database=config.database,
 53                        user=config.username,
 54                        password=config.password,
 55                        min_size=1,
 56                        max_size=config.pool_size,
 57                        command_timeout=config.timeout
 58                    )
 59
 60                elif config.db_type == 'mysql':
 61                    self.pools[name] = await aiomysql.create_pool(
 62                        host=config.host,
 63                        port=config.port or 3306,
 64                        db=config.database,
 65                        user=config.username,
 66                        password=config.password,
 67                        minsize=1,
 68                        maxsize=config.pool_size,
 69                        autocommit=False
 70                    )
 71
 72                elif config.db_type == 'sqlite':
 73                    # SQLite doesn't use connection pools in the same way
 74                    self.pools[name] = config.file_path
 75
 76                logger.info(f"Initialized {config.db_type} connection pool: {name}")
 77
 78            except Exception as e:
 79                logger.error(f"Failed to initialize database {name}: {e}")
 80                raise
 81
 82    async def close(self):
 83        """Close all database connections"""
 84        for name, pool in self.pools.items():
 85            try:
 86                if hasattr(pool, 'close'):
 87                    pool.close()
 88                    if hasattr(pool, 'wait_closed'):
 89                        await pool.wait_closed()
 90                logger.info(f"Closed database connection: {name}")
 91            except Exception as e:
 92                logger.error(f"Error closing database {name}: {e}")
 93
 94    @asynccontextmanager
 95    async def get_connection(self, db_name: str):
 96        """Get database connection from pool"""
 97        if db_name not in self.pools:
 98            raise ValueError(f"Database {db_name} not configured")
 99
100        config = self.configs[db_name]
101
102        if config.db_type == 'postgresql':
103            async with self.pools[db_name].acquire() as conn:
104                yield conn
105
106        elif config.db_type == 'mysql':
107            async with self.pools[db_name].acquire() as conn:
108                yield conn
109
110        elif config.db_type == 'sqlite':
111            async with aiosqlite.connect(self.pools[db_name]) as conn:
112                yield conn
113
114    async def execute_query(
115        self,
116        db_name: str,
117        query: str,
118        params: Optional[List] = None,
119        fetch_type: str = 'all'  # 'all', 'one', 'none'
120    ) -> Dict[str, Any]:
121        """Execute database query with safety checks and logging"""
122
123        # Security check
124        if not self._is_safe_query(query):
125            raise ValueError("Query contains potentially unsafe operations")
126
127        query_hash = hashlib.md5(f"{db_name}:{query}".encode()).hexdigest()
128        start_time = datetime.now()
129
130        try:
131            async with self.get_connection(db_name) as conn:
132                config = self.configs[db_name]
133
134                if config.db_type == 'postgresql':
135                    if fetch_type == 'all':
136                        rows = await conn.fetch(query, *(params or []))
137                        result = [dict(row) for row in rows]
138                    elif fetch_type == 'one':
139                        row = await conn.fetchrow(query, *(params or []))
140                        result = dict(row) if row else None
141                    else:  # fetch_type == 'none'
142                        await conn.execute(query, *(params or []))
143                        result = {"status": "executed"}
144
145                elif config.db_type == 'mysql':
146                    async with conn.cursor() as cursor:
147                        await cursor.execute(query, params or [])
148
149                        if fetch_type == 'all':
150                            rows = await cursor.fetchall()
151                            # Get column names
152                            columns = [desc[0] for desc in cursor.description] if cursor.description else []
153                            result = [dict(zip(columns, row)) for row in rows] if rows else []
154                        elif fetch_type == 'one':
155                            row = await cursor.fetchone()
156                            if row and cursor.description:
157                                columns = [desc[0] for desc in cursor.description]
158                                result = dict(zip(columns, row))
159                            else:
160                                result = None
161                        else:  # fetch_type == 'none'
162                            result = {"status": "executed", "rowcount": cursor.rowcount}
163
164                        if fetch_type == 'none':
165                            await conn.commit()
166
167                elif config.db_type == 'sqlite':
168                    if fetch_type == 'all':
169                        async with conn.execute(query, params or []) as cursor:
170                            rows = await cursor.fetchall()
171                            columns = [desc[0] for desc in cursor.description] if cursor.description else []
172                            result = [dict(zip(columns, row)) for row in rows] if rows else []
173                    elif fetch_type == 'one':
174                        async with conn.execute(query, params or []) as cursor:
175                            row = await cursor.fetchone()
176                            if row and cursor.description:
177                                columns = [desc[0] for desc in cursor.description]
178                                result = dict(zip(columns, row))
179                            else:
180                                result = None
181                    else:  # fetch_type == 'none'
182                        await conn.execute(query, params or [])
183                        await conn.commit()
184                        result = {"status": "executed"}
185
186            # Log query statistics
187            execution_time = (datetime.now() - start_time).total_seconds()
188            self._record_query_stats(query_hash, query, execution_time, True)
189
190            return {
191                "success": True,
192                "data": result,
193                "execution_time_ms": execution_time * 1000,
194                "query_hash": query_hash
195            }
196
197        except Exception as e:
198            execution_time = (datetime.now() - start_time).total_seconds()
199            self._record_query_stats(query_hash, query, execution_time, False, str(e))
200
201            return {
202                "success": False,
203                "error": str(e),
204                "execution_time_ms": execution_time * 1000,
205                "query_hash": query_hash
206            }
207
208    def _is_safe_query(self, query: str) -> bool:
209        """Basic query safety validation"""
210        query_lower = query.lower().strip()
211
212        # Block potentially dangerous operations
213        dangerous_operations = [
214            'drop table', 'drop database', 'truncate',
215            'delete from', 'update ', 'insert into',
216            'create user', 'grant ', 'revoke ',
217            'alter table', 'create database'
218        ]
219
220        # Allow only SELECT queries by default
221        # In production, you might want more sophisticated validation
222        if not query_lower.startswith('select'):
223            return False
224
225        for dangerous in dangerous_operations:
226            if dangerous in query_lower:
227                return False
228
229        return True
230
231    def _record_query_stats(self, query_hash: str, query: str, execution_time: float, success: bool, error: str = None):
232        """Record query execution statistics"""
233        if query_hash not in self.query_stats:
234            self.query_stats[query_hash] = {
235                'query': query[:200] + '...' if len(query) > 200 else query,
236                'execution_count': 0,
237                'total_time': 0,
238                'avg_time': 0,
239                'success_count': 0,
240                'error_count': 0,
241                'last_executed': None,
242                'last_error': None
243            }
244
245        stats = self.query_stats[query_hash]
246        stats['execution_count'] += 1
247        stats['total_time'] += execution_time
248        stats['avg_time'] = stats['total_time'] / stats['execution_count']
249        stats['last_executed'] = datetime.now().isoformat()
250
251        if success:
252            stats['success_count'] += 1
253        else:
254            stats['error_count'] += 1
255            stats['last_error'] = error
256
257    async def get_schema_info(self, db_name: str, table_name: Optional[str] = None) -> Dict[str, Any]:
258        """Get database schema information"""
259        config = self.configs[db_name]
260
261        try:
262            if config.db_type == 'postgresql':
263                if table_name:
264                    query = """
265                    SELECT column_name, data_type, is_nullable, column_default
266                    FROM information_schema.columns
267                    WHERE table_name = $1 AND table_schema = 'public'
268                    ORDER BY ordinal_position
269                    """
270                    result = await self.execute_query(db_name, query, [table_name])
271                else:
272                    query = """
273                    SELECT table_name, table_type
274                    FROM information_schema.tables
275                    WHERE table_schema = 'public'
276                    ORDER BY table_name
277                    """
278                    result = await self.execute_query(db_name, query)
279
280            elif config.db_type == 'mysql':
281                if table_name:
282                    query = """
283                    SELECT COLUMN_NAME as column_name, DATA_TYPE as data_type,
284                           IS_NULLABLE as is_nullable, COLUMN_DEFAULT as column_default
285                    FROM INFORMATION_SCHEMA.COLUMNS
286                    WHERE TABLE_NAME = %s AND TABLE_SCHEMA = DATABASE()
287                    ORDER BY ORDINAL_POSITION
288                    """
289                    result = await self.execute_query(db_name, query, [table_name])
290                else:
291                    query = """
292                    SELECT TABLE_NAME as table_name, TABLE_TYPE as table_type
293                    FROM INFORMATION_SCHEMA.TABLES
294                    WHERE TABLE_SCHEMA = DATABASE()
295                    ORDER BY TABLE_NAME
296                    """
297                    result = await self.execute_query(db_name, query)
298
299            elif config.db_type == 'sqlite':
300                if table_name:
301                    query = f"PRAGMA table_info({table_name})"
302                    result = await self.execute_query(db_name, query)
303                else:
304                    query = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
305                    result = await self.execute_query(db_name, query)
306
307            return result
308
309        except Exception as e:
310            return {
311                "success": False,
312                "error": f"Failed to get schema info: {str(e)}"
313            }
314
315    def get_query_statistics(self) -> Dict[str, Any]:
316        """Get query execution statistics"""
317        return {
318            "total_queries": len(self.query_stats),
319            "statistics": self.query_stats
320        }

🛠️ Advanced Database Tools for MCP Server

Now let’s integrate the database manager into our MCP server:

advanced_server.py:

  1#!/usr/bin/env python3
  2"""
  3Advanced MCP Server with Database Integration
  4"""
  5
  6import asyncio
  7import json
  8import logging
  9from typing import Dict, List, Any, Optional
 10from pathlib import Path
 11
 12from mcp.server.models import InitializeResult
 13from mcp.server import Server, NotificationOptions
 14from mcp.server.stdio import stdio_server
 15from mcp.types import Tool, TextContent
 16
 17from database_manager import DatabaseManager, DatabaseConfig
 18
 19logger = logging.getLogger("advanced-mcp-server")
 20
 21class AdvancedMCPServer:
 22    def __init__(self, config_path: str = "advanced_config.json"):
 23        self.server = Server("advanced-mcp-server")
 24        self.config_path = config_path
 25        self.db_manager = None
 26        self.config = None
 27
 28        self._setup_tools()
 29
 30    async def initialize(self):
 31        """Initialize server and database connections"""
 32        await self._load_config()
 33        await self._setup_database_manager()
 34
 35    async def _load_config(self):
 36        """Load server configuration"""
 37        try:
 38            with open(self.config_path) as f:
 39                self.config = json.load(f)
 40        except Exception as e:
 41            logger.error(f"Failed to load config: {e}")
 42            raise
 43
 44    async def _setup_database_manager(self):
 45        """Setup database manager with configured connections"""
 46        if 'databases' not in self.config:
 47            logger.warning("No database configurations found")
 48            return
 49
 50        db_configs = {}
 51        for name, db_config in self.config['databases'].items():
 52            db_configs[name] = DatabaseConfig(**db_config)
 53
 54        self.db_manager = DatabaseManager(db_configs)
 55        await self.db_manager.initialize()
 56
 57    def _setup_tools(self):
 58        """Setup MCP tools"""
 59
 60        @self.server.list_tools()
 61        async def handle_list_tools() -> List[Tool]:
 62            return [
 63                Tool(
 64                    name="execute_sql_query",
 65                    description="Execute SQL query on specified database",
 66                    inputSchema={
 67                        "type": "object",
 68                        "properties": {
 69                            "database": {
 70                                "type": "string",
 71                                "description": "Database connection name"
 72                            },
 73                            "query": {
 74                                "type": "string",
 75                                "description": "SQL query to execute (SELECT only for security)"
 76                            },
 77                            "params": {
 78                                "type": "array",
 79                                "description": "Query parameters",
 80                                "items": {"type": "string"},
 81                                "default": []
 82                            },
 83                            "limit": {
 84                                "type": "integer",
 85                                "description": "Maximum number of rows to return",
 86                                "default": 100,
 87                                "maximum": 1000
 88                            }
 89                        },
 90                        "required": ["database", "query"]
 91                    }
 92                ),
 93                Tool(
 94                    name="get_database_schema",
 95                    description="Get database schema information",
 96                    inputSchema={
 97                        "type": "object",
 98                        "properties": {
 99                            "database": {
100                                "type": "string",
101                                "description": "Database connection name"
102                            },
103                            "table": {
104                                "type": "string",
105                                "description": "Specific table name (optional)"
106                            }
107                        },
108                        "required": ["database"]
109                    }
110                ),
111                Tool(
112                    name="database_health_check",
113                    description="Check database connection health and statistics",
114                    inputSchema={
115                        "type": "object",
116                        "properties": {
117                            "database": {
118                                "type": "string",
119                                "description": "Database connection name (optional - checks all if not specified)"
120                            }
121                        }
122                    }
123                ),
124                Tool(
125                    name="query_builder_assistant",
126                    description="Help build SQL queries based on natural language description",
127                    inputSchema={
128                        "type": "object",
129                        "properties": {
130                            "database": {
131                                "type": "string",
132                                "description": "Database connection name"
133                            },
134                            "description": {
135                                "type": "string",
136                                "description": "Natural language description of what you want to query"
137                            },
138                            "tables": {
139                                "type": "array",
140                                "description": "Tables to include in the query",
141                                "items": {"type": "string"},
142                                "default": []
143                            }
144                        },
145                        "required": ["database", "description"]
146                    }
147                ),
148                Tool(
149                    name="explain_query_plan",
150                    description="Explain SQL query execution plan",
151                    inputSchema={
152                        "type": "object",
153                        "properties": {
154                            "database": {
155                                "type": "string",
156                                "description": "Database connection name"
157                            },
158                            "query": {
159                                "type": "string",
160                                "description": "SQL query to explain"
161                            }
162                        },
163                        "required": ["database", "query"]
164                    }
165                )
166            ]
167
168        @self.server.call_tool()
169        async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
170            if not self.db_manager:
171                return [TextContent(
172                    type="text",
173                    text="Database manager not initialized"
174                )]
175
176            try:
177                if name == "execute_sql_query":
178                    return await self._handle_sql_query(arguments)
179                elif name == "get_database_schema":
180                    return await self._handle_schema_info(arguments)
181                elif name == "database_health_check":
182                    return await self._handle_health_check(arguments)
183                elif name == "query_builder_assistant":
184                    return await self._handle_query_builder(arguments)
185                elif name == "explain_query_plan":
186                    return await self._handle_explain_plan(arguments)
187                else:
188                    return [TextContent(
189                        type="text",
190                        text=f"Unknown tool: {name}"
191                    )]
192
193            except Exception as e:
194                logger.error(f"Tool execution error for {name}: {e}")
195                return [TextContent(
196                    type="text",
197                    text=f"Error executing {name}: {str(e)}"
198                )]
199
200    async def _handle_sql_query(self, arguments: Dict[str, Any]) -> List[TextContent]:
201        """Handle SQL query execution"""
202        database = arguments.get("database")
203        query = arguments.get("query")
204        params = arguments.get("params", [])
205        limit = arguments.get("limit", 100)
206
207        # Add LIMIT clause if not present in SELECT queries
208        if query.strip().lower().startswith("select") and "limit" not in query.lower():
209            query = f"{query.rstrip(';')} LIMIT {limit}"
210
211        result = await self.db_manager.execute_query(
212            database, query, params, fetch_type='all'
213        )
214
215        if result["success"]:
216            data = result["data"]
217            execution_time = result["execution_time_ms"]
218
219            response = {
220                "query": query,
221                "database": database,
222                "execution_time_ms": execution_time,
223                "row_count": len(data) if isinstance(data, list) else 0,
224                "results": data[:limit] if isinstance(data, list) else data
225            }
226
227            return [TextContent(
228                type="text",
229                text=f"SQL Query Results:\n{json.dumps(response, indent=2, default=str)}"
230            )]
231        else:
232            return [TextContent(
233                type="text",
234                text=f"Query failed: {result['error']}"
235            )]
236
237    async def _handle_schema_info(self, arguments: Dict[str, Any]) -> List[TextContent]:
238        """Handle database schema information request"""
239        database = arguments.get("database")
240        table = arguments.get("table")
241
242        result = await self.db_manager.get_schema_info(database, table)
243
244        if result["success"]:
245            return [TextContent(
246                type="text",
247                text=f"Database Schema Information:\n{json.dumps(result['data'], indent=2, default=str)}"
248            )]
249        else:
250            return [TextContent(
251                type="text",
252                text=f"Schema query failed: {result['error']}"
253            )]
254
255    async def _handle_health_check(self, arguments: Dict[str, Any]) -> List[TextContent]:
256        """Handle database health check"""
257        specific_db = arguments.get("database")
258
259        health_info = {
260            "timestamp": datetime.now().isoformat(),
261            "databases": {}
262        }
263
264        databases_to_check = [specific_db] if specific_db else list(self.db_manager.configs.keys())
265
266        for db_name in databases_to_check:
267            try:
268                # Test basic connectivity with simple query
269                result = await self.db_manager.execute_query(
270                    db_name, "SELECT 1 as health_check", fetch_type='one'
271                )
272
273                health_info["databases"][db_name] = {
274                    "status": "healthy" if result["success"] else "unhealthy",
275                    "response_time_ms": result["execution_time_ms"],
276                    "error": result.get("error")
277                }
278
279            except Exception as e:
280                health_info["databases"][db_name] = {
281                    "status": "unhealthy",
282                    "error": str(e)
283                }
284
285        # Add query statistics
286        health_info["query_statistics"] = self.db_manager.get_query_statistics()
287
288        return [TextContent(
289            type="text",
290            text=f"Database Health Check:\n{json.dumps(health_info, indent=2, default=str)}"
291        )]
292
293    async def _handle_query_builder(self, arguments: Dict[str, Any]) -> List[TextContent]:
294        """Handle query builder assistance"""
295        database = arguments.get("database")
296        description = arguments.get("description")
297        tables = arguments.get("tables", [])
298
299        # Get schema information to help build query
300        schema_result = await self.db_manager.get_schema_info(database)
301
302        if not schema_result["success"]:
303            return [TextContent(
304                type="text",
305                text=f"Could not retrieve schema: {schema_result['error']}"
306            )]
307
308        available_tables = schema_result["data"]
309
310        # Simple query building logic based on description
311        # In a production system, you might use NLP or ML for better query generation
312        query_suggestions = self._generate_query_suggestions(description, available_tables, tables)
313
314        response = {
315            "description": description,
316            "database": database,
317            "available_tables": [table.get("table_name", table.get("name")) for table in available_tables],
318            "suggested_queries": query_suggestions,
319            "note": "Review these suggestions and modify as needed. Always test queries in a safe environment."
320        }
321
322        return [TextContent(
323            type="text",
324            text=f"Query Builder Assistant:\n{json.dumps(response, indent=2)}"
325        )]
326
327    async def _handle_explain_plan(self, arguments: Dict[str, Any]) -> List[TextContent]:
328        """Handle query execution plan explanation"""
329        database = arguments.get("database")
330        query = arguments.get("query")
331
332        config = self.db_manager.configs[database]
333
334        # Build EXPLAIN query based on database type
335        if config.db_type == 'postgresql':
336            explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
337        elif config.db_type == 'mysql':
338            explain_query = f"EXPLAIN FORMAT=JSON {query}"
339        elif config.db_type == 'sqlite':
340            explain_query = f"EXPLAIN QUERY PLAN {query}"
341        else:
342            return [TextContent(
343                type="text",
344                text=f"EXPLAIN not supported for database type: {config.db_type}"
345            )]
346
347        result = await self.db_manager.execute_query(
348            database, explain_query, fetch_type='all'
349        )
350
351        if result["success"]:
352            return [TextContent(
353                type="text",
354                text=f"Query Execution Plan:\n{json.dumps(result['data'], indent=2, default=str)}"
355            )]
356        else:
357            return [TextContent(
358                type="text",
359                text=f"EXPLAIN failed: {result['error']}"
360            )]
361
362    def _generate_query_suggestions(self, description: str, available_tables: List[Dict], target_tables: List[str]) -> List[str]:
363        """Generate SQL query suggestions based on description"""
364        suggestions = []
365
366        # Simple keyword-based query generation
367        # In production, you'd use more sophisticated NLP
368        description_lower = description.lower()
369
370        if target_tables:
371            tables_str = ", ".join(target_tables)
372        else:
373            # Use first table as default
374            tables_str = available_tables[0].get("table_name", available_tables[0].get("name", "table_name"))
375
376        if "count" in description_lower:
377            suggestions.append(f"SELECT COUNT(*) FROM {tables_str}")
378
379        if "all" in description_lower or "everything" in description_lower:
380            suggestions.append(f"SELECT * FROM {tables_str} LIMIT 10")
381
382        if "recent" in description_lower or "latest" in description_lower:
383            suggestions.append(f"SELECT * FROM {tables_str} ORDER BY created_at DESC LIMIT 10")
384
385        if "top" in description_lower:
386            suggestions.append(f"SELECT * FROM {tables_str} ORDER BY id DESC LIMIT 10")
387
388        # Default suggestion
389        if not suggestions:
390            suggestions.append(f"SELECT * FROM {tables_str} LIMIT 10")
391
392        return suggestions
393
394    async def run(self):
395        """Run the advanced MCP server"""
396        await self.initialize()
397
398        async with stdio_server() as (read_stream, write_stream):
399            await self.server.run(
400                read_stream,
401                write_stream,
402                InitializeResult(
403                    protocolVersion="2024-11-05",
404                    capabilities=self.server.get_capabilities(
405                        notification_options=NotificationOptions(),
406                        experimental_capabilities={}
407                    )
408                )
409            )
410
411    async def cleanup(self):
412        """Cleanup resources"""
413        if self.db_manager:
414            await self.db_manager.close()

🌐 REST API Integration & Testing Tools

🔌 Advanced API Connector System

api_connector.py:

  1#!/usr/bin/env python3
  2"""
  3Advanced API Connector for MCP Server
  4Supports REST API integration, testing, and monitoring
  5"""
  6
  7import aiohttp
  8import asyncio
  9import json
 10import time
 11import hashlib
 12from typing import Dict, List, Any, Optional, Union
 13from dataclasses import dataclass, asdict
 14from datetime import datetime, timedelta
 15import logging
 16from urllib.parse import urljoin, urlparse
 17
 18logger = logging.getLogger(__name__)
 19
 20@dataclass
 21class APIEndpoint:
 22    """API endpoint configuration"""
 23    name: str
 24    base_url: str
 25    endpoints: Dict[str, str]
 26    auth_type: Optional[str] = None  # 'bearer', 'basic', 'api_key', 'oauth2'
 27    auth_config: Optional[Dict[str, Any]] = None
 28    headers: Optional[Dict[str, str]] = None
 29    timeout: int = 30
 30    retry_count: int = 3
 31    rate_limit: Optional[int] = None  # requests per minute
 32
 33@dataclass
 34class APIRequest:
 35    """API request configuration"""
 36    method: str  # GET, POST, PUT, DELETE, PATCH
 37    url: str
 38    headers: Optional[Dict[str, str]] = None
 39    params: Optional[Dict[str, Any]] = None
 40    json_data: Optional[Dict[str, Any]] = None
 41    form_data: Optional[Dict[str, Any]] = None
 42
 43@dataclass
 44class APIResponse:
 45    """API response data"""
 46    status_code: int
 47    headers: Dict[str, str]
 48    content: Union[Dict, List, str]
 49    response_time: float
 50    request_url: str
 51    success: bool
 52    error: Optional[str] = None
 53
 54class APIConnector:
 55    """Advanced API connector with testing and monitoring capabilities"""
 56
 57    def __init__(self, endpoints: Dict[str, APIEndpoint]):
 58        self.endpoints = endpoints
 59        self.request_history = []
 60        self.performance_stats = {}
 61        self.rate_limiters = {}
 62        self.session = None
 63
 64    async def initialize(self):
 65        """Initialize HTTP session and rate limiters"""
 66        timeout = aiohttp.ClientTimeout(total=60)
 67        self.session = aiohttp.ClientSession(timeout=timeout)
 68
 69        # Initialize rate limiters
 70        for endpoint_name, endpoint in self.endpoints.items():
 71            if endpoint.rate_limit:
 72                self.rate_limiters[endpoint_name] = {
 73                    'requests': [],
 74                    'limit': endpoint.rate_limit
 75                }
 76
 77    async def close(self):
 78        """Close HTTP session"""
 79        if self.session:
 80            await self.session.close()
 81
 82    async def make_request(
 83        self,
 84        endpoint_name: str,
 85        request: APIRequest,
 86        cache_response: bool = False
 87    ) -> APIResponse:
 88        """Make API request with authentication, retries, and monitoring"""
 89
 90        if endpoint_name not in self.endpoints:
 91            raise ValueError(f"Endpoint {endpoint_name} not configured")
 92
 93        endpoint = self.endpoints[endpoint_name]
 94
 95        # Check rate limiting
 96        if not await self._check_rate_limit(endpoint_name):
 97            return APIResponse(
 98                status_code=429,
 99                headers={},
100                content={"error": "Rate limit exceeded"},
101                response_time=0,
102                request_url=request.url,
103                success=False,
104                error="Rate limit exceeded"
105            )
106
107        # Prepare request
108        full_url = urljoin(endpoint.base_url, request.url)
109        headers = self._prepare_headers(endpoint, request.headers)
110
111        start_time = time.time()
112        last_error = None
113
114        # Retry logic
115        for attempt in range(endpoint.retry_count):
116            try:
117                async with self.session.request(
118                    method=request.method,
119                    url=full_url,
120                    headers=headers,
121                    params=request.params,
122                    json=request.json_data,
123                    data=request.form_data,
124                    timeout=aiohttp.ClientTimeout(total=endpoint.timeout)
125                ) as response:
126
127                    response_time = time.time() - start_time
128
129                    # Parse response content
130                    try:
131                        if response.content_type == 'application/json':
132                            content = await response.json()
133                        else:
134                            content = await response.text()
135                    except Exception:
136                        content = await response.text()
137
138                    api_response = APIResponse(
139                        status_code=response.status,
140                        headers=dict(response.headers),
141                        content=content,
142                        response_time=response_time,
143                        request_url=full_url,
144                        success=200 <= response.status < 300
145                    )
146
147                    # Record request for history and stats
148                    self._record_request(endpoint_name, request, api_response)
149
150                    return api_response
151
152            except asyncio.TimeoutError:
153                last_error = f"Timeout after {endpoint.timeout}s"
154                if attempt < endpoint.retry_count - 1:
155                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
156
157            except aiohttp.ClientError as e:
158                last_error = f"Client error: {str(e)}"
159                if attempt < endpoint.retry_count - 1:
160                    await asyncio.sleep(2 ** attempt)
161
162            except Exception as e:
163                last_error = f"Unexpected error: {str(e)}"
164                break
165
166        # All retries failed
167        response_time = time.time() - start_time
168        error_response = APIResponse(
169            status_code=0,
170            headers={},
171            content={"error": last_error},
172            response_time=response_time,
173            request_url=full_url,
174            success=False,
175            error=last_error
176        )
177
178        self._record_request(endpoint_name, request, error_response)
179        return error_response
180
181    async def _check_rate_limit(self, endpoint_name: str) -> bool:
182        """Check if request is within rate limits"""
183        if endpoint_name not in self.rate_limiters:
184            return True
185
186        limiter = self.rate_limiters[endpoint_name]
187        now = time.time()
188        minute_ago = now - 60
189
190        # Clean old requests
191        limiter['requests'] = [req_time for req_time in limiter['requests'] if req_time > minute_ago]
192
193        # Check limit
194        if len(limiter['requests']) >= limiter['limit']:
195            return False
196
197        # Record this request
198        limiter['requests'].append(now)
199        return True
200
201    def _prepare_headers(self, endpoint: APIEndpoint, request_headers: Optional[Dict[str, str]]) -> Dict[str, str]:
202        """Prepare headers with authentication"""
203        headers = {}
204
205        # Add endpoint default headers
206        if endpoint.headers:
207            headers.update(endpoint.headers)
208
209        # Add request specific headers
210        if request_headers:
211            headers.update(request_headers)
212
213        # Add authentication
214        if endpoint.auth_type and endpoint.auth_config:
215            auth_headers = self._get_auth_headers(endpoint.auth_type, endpoint.auth_config)
216            headers.update(auth_headers)
217
218        return headers
219
220    def _get_auth_headers(self, auth_type: str, auth_config: Dict[str, Any]) -> Dict[str, str]:
221        """Generate authentication headers"""
222        if auth_type == 'bearer':
223            return {'Authorization': f"Bearer {auth_config['token']}"}
224
225        elif auth_type == 'api_key':
226            key_name = auth_config.get('key_name', 'X-API-Key')
227            return {key_name: auth_config['key']}
228
229        elif auth_type == 'basic':
230            import base64
231            credentials = f"{auth_config['username']}:{auth_config['password']}"
232            encoded = base64.b64encode(credentials.encode()).decode()
233            return {'Authorization': f"Basic {encoded}"}
234
235        return {}
236
237    def _record_request(self, endpoint_name: str, request: APIRequest, response: APIResponse):
238        """Record request for history and performance tracking"""
239        record = {
240            'timestamp': datetime.now().isoformat(),
241            'endpoint_name': endpoint_name,
242            'method': request.method,
243            'url': response.request_url,
244            'status_code': response.status_code,
245            'response_time': response.response_time,
246            'success': response.success,
247            'error': response.error
248        }
249
250        self.request_history.append(record)
251
252        # Keep only last 1000 requests
253        if len(self.request_history) > 1000:
254            self.request_history = self.request_history[-1000:]
255
256        # Update performance stats
257        if endpoint_name not in self.performance_stats:
258            self.performance_stats[endpoint_name] = {
259                'total_requests': 0,
260                'successful_requests': 0,
261                'failed_requests': 0,
262                'avg_response_time': 0,
263                'total_response_time': 0
264            }
265
266        stats = self.performance_stats[endpoint_name]
267        stats['total_requests'] += 1
268        stats['total_response_time'] += response.response_time
269        stats['avg_response_time'] = stats['total_response_time'] / stats['total_requests']
270
271        if response.success:
272            stats['successful_requests'] += 1
273        else:
274            stats['failed_requests'] += 1
275
276    async def test_endpoint_health(self, endpoint_name: str) -> Dict[str, Any]:
277        """Test endpoint health and performance"""
278        if endpoint_name not in self.endpoints:
279            return {"error": f"Endpoint {endpoint_name} not configured"}
280
281        endpoint = self.endpoints[endpoint_name]
282        health_results = []
283
284        # Test basic connectivity
285        test_request = APIRequest(method="GET", url="/")
286
287        for i in range(3):  # Run 3 test requests
288            start_time = time.time()
289            response = await self.make_request(endpoint_name, test_request)
290
291            health_results.append({
292                'attempt': i + 1,
293                'success': response.success,
294                'status_code': response.status_code,
295                'response_time': response.response_time,
296                'error': response.error
297            })
298
299        # Calculate health metrics
300        successful_attempts = sum(1 for result in health_results if result['success'])
301        avg_response_time = sum(result['response_time'] for result in health_results) / len(health_results)
302
303        return {
304            'endpoint_name': endpoint_name,
305            'base_url': endpoint.base_url,
306            'health_status': 'healthy' if successful_attempts >= 2 else 'unhealthy',
307            'success_rate': successful_attempts / len(health_results),
308            'avg_response_time': avg_response_time,
309            'test_results': health_results,
310            'timestamp': datetime.now().isoformat()
311        }
312
313    def get_performance_stats(self) -> Dict[str, Any]:
314        """Get performance statistics for all endpoints"""
315        return {
316            'endpoints': self.performance_stats,
317            'total_requests': len(self.request_history),
318            'recent_requests': self.request_history[-10:] if self.request_history else []
319        }
320
321    async def run_api_test_suite(self, test_config: Dict[str, Any]) -> Dict[str, Any]:
322        """Run comprehensive API test suite"""
323        results = {
324            'test_suite': test_config.get('name', 'API Test Suite'),
325            'start_time': datetime.now().isoformat(),
326            'tests': [],
327            'summary': {
328                'total_tests': 0,
329                'passed': 0,
330                'failed': 0,
331                'success_rate': 0
332            }
333        }
334
335        for test_case in test_config.get('test_cases', []):
336            test_result = await self._run_single_test(test_case)
337            results['tests'].append(test_result)
338
339            results['summary']['total_tests'] += 1
340            if test_result['passed']:
341                results['summary']['passed'] += 1
342            else:
343                results['summary']['failed'] += 1
344
345        if results['summary']['total_tests'] > 0:
346            results['summary']['success_rate'] = results['summary']['passed'] / results['summary']['total_tests']
347
348        results['end_time'] = datetime.now().isoformat()
349        return results
350
351    async def _run_single_test(self, test_case: Dict[str, Any]) -> Dict[str, Any]:
352        """Run a single API test case"""
353        test_name = test_case.get('name', 'Unnamed Test')
354        endpoint_name = test_case['endpoint']
355
356        request = APIRequest(
357            method=test_case.get('method', 'GET'),
358            url=test_case.get('path', '/'),
359            headers=test_case.get('headers'),
360            params=test_case.get('params'),
361            json_data=test_case.get('json_data'),
362            form_data=test_case.get('form_data')
363        )
364
365        try:
366            response = await self.make_request(endpoint_name, request)
367
368            # Check assertions
369            assertions = test_case.get('assertions', {})
370            assertion_results = []
371
372            if 'status_code' in assertions:
373                expected_status = assertions['status_code']
374                assertion_results.append({
375                    'assertion': f'status_code == {expected_status}',
376                    'passed': response.status_code == expected_status,
377                    'actual': response.status_code,
378                    'expected': expected_status
379                })
380
381            if 'response_time_max' in assertions:
382                max_time = assertions['response_time_max']
383                assertion_results.append({
384                    'assertion': f'response_time <= {max_time}',
385                    'passed': response.response_time <= max_time,
386                    'actual': response.response_time,
387                    'expected': f'<= {max_time}'
388                })
389
390            if 'contains' in assertions:
391                contains_text = assertions['contains']
392                content_str = json.dumps(response.content) if isinstance(response.content, (dict, list)) else str(response.content)
393                assertion_results.append({
394                    'assertion': f'response contains "{contains_text}"',
395                    'passed': contains_text in content_str,
396                    'actual': content_str[:200] + '...' if len(content_str) > 200 else content_str
397                })
398
399            all_passed = all(result['passed'] for result in assertion_results)
400
401            return {
402                'test_name': test_name,
403                'endpoint': endpoint_name,
404                'method': request.method,
405                'url': request.url,
406                'passed': all_passed and response.success,
407                'response_time': response.response_time,
408                'status_code': response.status_code,
409                'assertions': assertion_results,
410                'error': response.error
411            }
412
413        except Exception as e:
414            return {
415                'test_name': test_name,
416                'endpoint': endpoint_name,
417                'passed': False,
418                'error': str(e)
419            }

🧪 API Testing Tools Integration

Add API testing tools to the MCP server:

Extended server tools for API testing:

 1# Add these tools to your AdvancedMCPServer class
 2
 3Tool(
 4    name="make_api_request",
 5    description="Make HTTP request to configured API endpoint",
 6    inputSchema={
 7        "type": "object",
 8        "properties": {
 9            "endpoint": {
10                "type": "string",
11                "description": "API endpoint name from configuration"
12            },
13            "method": {
14                "type": "string",
15                "description": "HTTP method",
16                "enum": ["GET", "POST", "PUT", "DELETE", "PATCH"],
17                "default": "GET"
18            },
19            "path": {
20                "type": "string",
21                "description": "API path (relative to base URL)"
22            },
23            "params": {
24                "type": "object",
25                "description": "Query parameters"
26            },
27            "json_data": {
28                "type": "object",
29                "description": "JSON request body"
30            },
31            "headers": {
32                "type": "object",
33                "description": "Additional headers"
34            }
35        },
36        "required": ["endpoint", "path"]
37    }
38),
39Tool(
40    name="test_api_endpoint",
41    description="Run health check on API endpoint",
42    inputSchema={
43        "type": "object",
44        "properties": {
45            "endpoint": {
46                "type": "string",
47                "description": "API endpoint name to test"
48            }
49        },
50        "required": ["endpoint"]
51    }
52),
53Tool(
54    name="run_api_test_suite",
55    description="Execute comprehensive API test suite",
56    inputSchema={
57        "type": "object",
58        "properties": {
59            "test_suite_name": {
60                "type": "string",
61                "description": "Name of test suite to run"
62            },
63            "endpoints": {
64                "type": "array",
65                "description": "Specific endpoints to test",
66                "items": {"type": "string"}
67            }
68        },
69        "required": ["test_suite_name"]
70    }
71),
72Tool(
73    name="api_performance_stats",
74    description="Get API performance statistics and monitoring data",
75    inputSchema={
76        "type": "object",
77        "properties": {
78            "endpoint": {
79                "type": "string",
80                "description": "Specific endpoint name (optional)"
81            },
82            "time_range": {
83                "type": "string",
84                "description": "Time range for stats",
85                "enum": ["1h", "6h", "24h", "7d"],
86                "default": "1h"
87            }
88        }
89    }
90)

🐳 Production Deployment with Docker

🏭 Docker Configuration

Dockerfile:

 1# Multi-stage build for production MCP server
 2FROM python:3.11-slim as builder
 3
 4# Set environment variables
 5ENV PYTHONDONTWRITEBYTECODE=1
 6ENV PYTHONUNBUFFERED=1
 7ENV PIP_DISABLE_PIP_VERSION_CHECK=1
 8
 9# Install build dependencies
10RUN apt-get update && apt-get install -y \
11    gcc \
12    g++ \
13    libpq-dev \
14    && rm -rf /var/lib/apt/lists/*
15
16# Create virtual environment
17RUN python -m venv /opt/venv
18ENV PATH="/opt/venv/bin:$PATH"
19
20# Install Python dependencies
21COPY requirements.txt .
22RUN pip install --no-cache-dir -r requirements.txt
23
24# Production stage
25FROM python:3.11-slim
26
27# Install runtime dependencies
28RUN apt-get update && apt-get install -y \
29    libpq5 \
30    curl \
31    && rm -rf /var/lib/apt/lists/*
32
33# Copy virtual environment from builder
34COPY --from=builder /opt/venv /opt/venv
35ENV PATH="/opt/venv/bin:$PATH"
36
37# Create non-root user
38RUN groupadd -r mcpuser && useradd -r -g mcpuser mcpuser
39
40# Create application directory
41WORKDIR /app
42
43# Copy application code
44COPY --chown=mcpuser:mcpuser . .
45
46# Create logs directory
47RUN mkdir -p /app/logs && chown mcpuser:mcpuser /app/logs
48
49# Switch to non-root user
50USER mcpuser
51
52# Health check
53HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
54    CMD python -c "import asyncio; import sys; sys.exit(0)"
55
56# Expose port (if needed for monitoring)
57EXPOSE 8080
58
59# Run the MCP server
60CMD ["python", "advanced_server.py"]

docker-compose.yml:

  1version: '3.8'
  2
  3services:
  4  # PostgreSQL Database
  5  postgres:
  6    image: postgres:15-alpine
  7    container_name: mcp-postgres
  8    environment:
  9      POSTGRES_DB: ${POSTGRES_DB:-mcp_database}
 10      POSTGRES_USER: ${POSTGRES_USER:-mcp_user}
 11      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-mcp_password}
 12    volumes:
 13      - postgres_data:/var/lib/postgresql/data
 14      - ./init-scripts:/docker-entrypoint-initdb.d
 15    ports:
 16      - "${POSTGRES_PORT:-5432}:5432"
 17    networks:
 18      - mcp-network
 19    restart: unless-stopped
 20    healthcheck:
 21      test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-mcp_user}"]
 22      interval: 30s
 23      timeout: 10s
 24      retries: 3
 25
 26  # Redis Cache
 27  redis:
 28    image: redis:7-alpine
 29    container_name: mcp-redis
 30    command: redis-server --appendonly yes --requirepass ${REDIS_PASSWORD:-redis_password}
 31    volumes:
 32      - redis_data:/data
 33    ports:
 34      - "${REDIS_PORT:-6379}:6379"
 35    networks:
 36      - mcp-network
 37    restart: unless-stopped
 38    healthcheck:
 39      test: ["CMD", "redis-cli", "--raw", "incr", "ping"]
 40      interval: 30s
 41      timeout: 10s
 42      retries: 3
 43
 44  # MCP Server
 45  mcp-server:
 46    build: .
 47    container_name: mcp-advanced-server
 48    environment:
 49      - POSTGRES_HOST=postgres
 50      - POSTGRES_PORT=5432
 51      - POSTGRES_DB=${POSTGRES_DB:-mcp_database}
 52      - POSTGRES_USER=${POSTGRES_USER:-mcp_user}
 53      - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-mcp_password}
 54      - REDIS_HOST=redis
 55      - REDIS_PORT=6379
 56      - REDIS_PASSWORD=${REDIS_PASSWORD:-redis_password}
 57      - LOG_LEVEL=${LOG_LEVEL:-INFO}
 58      - MCP_SERVER_ENV=production
 59    volumes:
 60      - ./config:/app/config:ro
 61      - ./logs:/app/logs
 62      - mcp_data:/app/data
 63    depends_on:
 64      postgres:
 65        condition: service_healthy
 66      redis:
 67        condition: service_healthy
 68    networks:
 69      - mcp-network
 70    restart: unless-stopped
 71    healthcheck:
 72      test: ["CMD", "python", "health_check.py"]
 73      interval: 30s
 74      timeout: 10s
 75      retries: 3
 76
 77  # Monitoring with Prometheus
 78  prometheus:
 79    image: prom/prometheus:latest
 80    container_name: mcp-prometheus
 81    ports:
 82      - "9090:9090"
 83    volumes:
 84      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
 85      - prometheus_data:/prometheus
 86    command:
 87      - '--config.file=/etc/prometheus/prometheus.yml'
 88      - '--storage.tsdb.path=/prometheus'
 89      - '--web.console.libraries=/etc/prometheus/console_libraries'
 90      - '--web.console.templates=/etc/prometheus/consoles'
 91    networks:
 92      - mcp-network
 93    restart: unless-stopped
 94
 95  # Grafana Dashboard
 96  grafana:
 97    image: grafana/grafana:latest
 98    container_name: mcp-grafana
 99    ports:
100      - "3000:3000"
101    environment:
102      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD:-admin123}
103    volumes:
104      - grafana_data:/var/lib/grafana
105      - ./monitoring/grafana/provisioning:/etc/grafana/provisioning:ro
106    networks:
107      - mcp-network
108    restart: unless-stopped
109
110volumes:
111  postgres_data:
112  redis_data:
113  mcp_data:
114  prometheus_data:
115  grafana_data:
116
117networks:
118  mcp-network:
119    driver: bridge

📊 Monitoring & Observability

monitoring/prometheus.yml:

 1global:
 2  scrape_interval: 15s
 3  evaluation_interval: 15s
 4
 5rule_files:
 6  - "rules/*.yml"
 7
 8scrape_configs:
 9  - job_name: 'mcp-server'
10    static_configs:
11      - targets: ['mcp-server:8080']
12    metrics_path: /metrics
13    scrape_interval: 10s
14
15  - job_name: 'postgresql'
16    static_configs:
17      - targets: ['postgres:5432']
18
19  - job_name: 'redis'
20    static_configs:
21      - targets: ['redis:6379']
22
23alerting:
24  alertmanagers:
25    - static_configs:
26        - targets:
27          - alertmanager:9093

health_check.py:

 1#!/usr/bin/env python3
 2"""
 3Health check script for MCP server container
 4"""
 5
 6import asyncio
 7import sys
 8import json
 9from pathlib import Path
10
11async def check_health():
12    """Perform health checks"""
13    try:
14        # Check if server configuration exists
15        config_path = Path("advanced_config.json")
16        if not config_path.exists():
17            print("❌ Configuration file missing")
18            return False
19
20        # Try to load configuration
21        with open(config_path) as f:
22            config = json.load(f)
23
24        print("✅ Configuration loaded successfully")
25
26        # Check database connectivity
27        # Note: In a real implementation, you'd test actual DB connections
28        if 'databases' in config:
29            print(f"✅ {len(config['databases'])} database(s) configured")
30
31        # Check API endpoints
32        if 'api_endpoints' in config:
33            print(f"✅ {len(config['api_endpoints'])} API endpoint(s) configured")
34
35        print("✅ Health check passed")
36        return True
37
38    except Exception as e:
39        print(f"❌ Health check failed: {e}")
40        return False
41
42if __name__ == "__main__":
43    result = asyncio.run(check_health())
44    sys.exit(0 if result else 1)

🔐 Security & Authentication

🛡️ Security Configuration

security_manager.py:

  1#!/usr/bin/env python3
  2"""
  3Security manager for MCP server
  4Handles authentication, authorization, and audit logging
  5"""
  6
  7import jwt
  8import hashlib
  9import hmac
 10import time
 11import logging
 12from typing import Dict, List, Any, Optional
 13from datetime import datetime, timedelta
 14from dataclasses import dataclass
 15import json
 16
 17logger = logging.getLogger(__name__)
 18
 19@dataclass
 20class User:
 21    """User information"""
 22    username: str
 23    roles: List[str]
 24    permissions: List[str]
 25    api_key: Optional[str] = None
 26    expires_at: Optional[datetime] = None
 27
 28@dataclass
 29class AuditEvent:
 30    """Audit event information"""
 31    timestamp: datetime
 32    user: str
 33    action: str
 34    resource: str
 35    details: Dict[str, Any]
 36    success: bool
 37    ip_address: Optional[str] = None
 38
 39class SecurityManager:
 40    """Advanced security manager for MCP server"""
 41
 42    def __init__(self, config: Dict[str, Any]):
 43        self.config = config
 44        self.jwt_secret = config.get('jwt_secret', 'default-secret-change-me')
 45        self.jwt_algorithm = config.get('jwt_algorithm', 'HS256')
 46        self.jwt_expiration = config.get('jwt_expiration', 3600)  # 1 hour
 47
 48        self.users = {}  # In production, use database
 49        self.audit_events = []
 50        self.rate_limits = {}
 51
 52        self._load_users()
 53
 54    def _load_users(self):
 55        """Load users from configuration"""
 56        for user_config in self.config.get('users', []):
 57            user = User(
 58                username=user_config['username'],
 59                roles=user_config.get('roles', []),
 60                permissions=user_config.get('permissions', []),
 61                api_key=user_config.get('api_key')
 62            )
 63            self.users[user.username] = user
 64
 65    def create_jwt_token(self, username: str) -> str:
 66        """Create JWT token for user"""
 67        if username not in self.users:
 68            raise ValueError(f"User {username} not found")
 69
 70        user = self.users[username]
 71
 72        payload = {
 73            'username': username,
 74            'roles': user.roles,
 75            'permissions': user.permissions,
 76            'iat': time.time(),
 77            'exp': time.time() + self.jwt_expiration
 78        }
 79
 80        return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)
 81
 82    def verify_jwt_token(self, token: str) -> Optional[Dict[str, Any]]:
 83        """Verify JWT token and return payload"""
 84        try:
 85            payload = jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
 86            return payload
 87        except jwt.ExpiredSignatureError:
 88            logger.warning("JWT token expired")
 89            return None
 90        except jwt.InvalidTokenError:
 91            logger.warning("Invalid JWT token")
 92            return None
 93
 94    def verify_api_key(self, api_key: str) -> Optional[User]:
 95        """Verify API key and return user"""
 96        for user in self.users.values():
 97            if user.api_key and hmac.compare_digest(user.api_key, api_key):
 98                return user
 99        return None
100
101    def check_permission(self, user: User, action: str, resource: str) -> bool:
102        """Check if user has permission for action on resource"""
103        # Admin role has all permissions
104        if 'admin' in user.roles:
105            return True
106
107        # Check specific permissions
108        required_permission = f"{action}:{resource}"
109        if required_permission in user.permissions:
110            return True
111
112        # Check wildcard permissions
113        wildcard_permission = f"{action}:*"
114        if wildcard_permission in user.permissions:
115            return True
116
117        return False
118
119    def check_rate_limit(self, user: User, action: str, limit: int, window: int = 60) -> bool:
120        """Check rate limit for user action"""
121        key = f"{user.username}:{action}"
122        now = time.time()
123
124        if key not in self.rate_limits:
125            self.rate_limits[key] = []
126
127        # Clean old entries
128        self.rate_limits[key] = [
129            timestamp for timestamp in self.rate_limits[key]
130            if timestamp > now - window
131        ]
132
133        # Check limit
134        if len(self.rate_limits[key]) >= limit:
135            return False
136
137        # Record this request
138        self.rate_limits[key].append(now)
139        return True
140
141    def log_audit_event(
142        self,
143        user: str,
144        action: str,
145        resource: str,
146        details: Dict[str, Any],
147        success: bool,
148        ip_address: Optional[str] = None
149    ):
150        """Log audit event"""
151        event = AuditEvent(
152            timestamp=datetime.now(),
153            user=user,
154            action=action,
155            resource=resource,
156            details=details,
157            success=success,
158            ip_address=ip_address
159        )
160
161        self.audit_events.append(event)
162
163        # Keep only last 10000 events in memory
164        if len(self.audit_events) > 10000:
165            self.audit_events = self.audit_events[-10000:]
166
167        # Log to file/database
168        logger.info(f"AUDIT: {user} {action} {resource} {'SUCCESS' if success else 'FAILED'}")
169
170    def get_audit_events(
171        self,
172        user: Optional[str] = None,
173        action: Optional[str] = None,
174        start_time: Optional[datetime] = None,
175        end_time: Optional[datetime] = None,
176        limit: int = 100
177    ) -> List[Dict[str, Any]]:
178        """Get audit events with filters"""
179        filtered_events = []
180
181        for event in self.audit_events:
182            # Apply filters
183            if user and event.user != user:
184                continue
185            if action and event.action != action:
186                continue
187            if start_time and event.timestamp < start_time:
188                continue
189            if end_time and event.timestamp > end_time:
190                continue
191
192            filtered_events.append({
193                'timestamp': event.timestamp.isoformat(),
194                'user': event.user,
195                'action': event.action,
196                'resource': event.resource,
197                'details': event.details,
198                'success': event.success,
199                'ip_address': event.ip_address
200            })
201
202        # Sort by timestamp descending and limit
203        filtered_events.sort(key=lambda x: x['timestamp'], reverse=True)
204        return filtered_events[:limit]
205
206    def get_security_stats(self) -> Dict[str, Any]:
207        """Get security statistics"""
208        total_events = len(self.audit_events)
209        successful_events = sum(1 for event in self.audit_events if event.success)
210        failed_events = total_events - successful_events
211
212        # Count events by action
213        action_counts = {}
214        for event in self.audit_events:
215            action_counts[event.action] = action_counts.get(event.action, 0) + 1
216
217        # Count events by user
218        user_counts = {}
219        for event in self.audit_events:
220            user_counts[event.user] = user_counts.get(event.user, 0) + 1
221
222        return {
223            'total_events': total_events,
224            'successful_events': successful_events,
225            'failed_events': failed_events,
226            'success_rate': successful_events / total_events if total_events > 0 else 0,
227            'events_by_action': action_counts,
228            'events_by_user': user_counts,
229            'active_users': len(self.users),
230            'rate_limit_keys': len(self.rate_limits)
231        }

🎯 Complete Advanced Configuration

advanced_config.json:

  1{
  2  "server": {
  3    "name": "advanced-mcp-server",
  4    "version": "2.0.0",
  5    "description": "Advanced MCP server with database and API integration",
  6    "environment": "production"
  7  },
  8  "databases": {
  9    "primary_db": {
 10      "db_type": "postgresql",
 11      "host": "localhost",
 12      "port": 5432,
 13      "database": "mcp_database",
 14      "username": "mcp_user",
 15      "password": "mcp_password",
 16      "pool_size": 20,
 17      "timeout": 30
 18    },
 19    "analytics_db": {
 20      "db_type": "mysql",
 21      "host": "localhost",
 22      "port": 3306,
 23      "database": "analytics",
 24      "username": "analytics_user",
 25      "password": "analytics_password",
 26      "pool_size": 10
 27    },
 28    "cache_db": {
 29      "db_type": "sqlite",
 30      "file_path": "./data/cache.db"
 31    }
 32  },
 33  "api_endpoints": {
 34    "github_api": {
 35      "name": "GitHub API",
 36      "base_url": "https://api.github.com",
 37      "endpoints": {
 38        "repos": "/repos/{owner}/{repo}",
 39        "issues": "/repos/{owner}/{repo}/issues",
 40        "users": "/users/{username}"
 41      },
 42      "auth_type": "bearer",
 43      "auth_config": {
 44        "token": "${GITHUB_TOKEN}"
 45      },
 46      "headers": {
 47        "Accept": "application/vnd.github.v3+json",
 48        "User-Agent": "MCP-Server/2.0"
 49      },
 50      "rate_limit": 5000,
 51      "timeout": 30
 52    },
 53    "internal_api": {
 54      "name": "Internal API",
 55      "base_url": "https://internal.company.com/api",
 56      "endpoints": {
 57        "users": "/users",
 58        "projects": "/projects",
 59        "metrics": "/metrics"
 60      },
 61      "auth_type": "api_key",
 62      "auth_config": {
 63        "key_name": "X-API-Key",
 64        "key": "${INTERNAL_API_KEY}"
 65      },
 66      "rate_limit": 1000
 67    }
 68  },
 69  "security": {
 70    "jwt_secret": "${JWT_SECRET}",
 71    "jwt_algorithm": "HS256",
 72    "jwt_expiration": 3600,
 73    "users": [
 74      {
 75        "username": "admin",
 76        "roles": ["admin"],
 77        "permissions": ["*:*"],
 78        "api_key": "${ADMIN_API_KEY}"
 79      },
 80      {
 81        "username": "developer",
 82        "roles": ["developer"],
 83        "permissions": [
 84          "query:database",
 85          "read:schema",
 86          "request:api",
 87          "test:endpoints"
 88        ],
 89        "api_key": "${DEVELOPER_API_KEY}"
 90      }
 91    ],
 92    "rate_limits": {
 93      "default": 100,
 94      "database_query": 50,
 95      "api_request": 200
 96    }
 97  },
 98  "logging": {
 99    "level": "INFO",
100    "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
101    "file": "logs/advanced-mcp-server.log",
102    "max_size": "10MB",
103    "backup_count": 5
104  },
105  "monitoring": {
106    "metrics_enabled": true,
107    "metrics_port": 8080,
108    "health_check_enabled": true,
109    "performance_tracking": true
110  }
111}

🎉 Conclusion & Next Steps

✅ What We’ve Accomplished

In Part 2, we’ve built a production-ready MCP server with:

🗄️ Database Integration:

  • Multi-database support (PostgreSQL, MySQL, SQLite)
  • Connection pooling and performance monitoring
  • Safe query execution with security validation
  • Schema introspection and query building assistance

🌐 API Integration:

  • REST API connector with authentication
  • Rate limiting and retry mechanisms
  • Comprehensive API testing suite
  • Performance monitoring and health checks

🐳 Production Deployment:

  • Docker containerization with multi-stage builds
  • Docker Compose for full stack deployment
  • Health checks and monitoring integration
  • Prometheus and Grafana observability

🔐 Security Features:

  • JWT token authentication
  • API key management
  • Role-based access control
  • Audit logging and security monitoring

🚀 Claude Code Integration

Your Claude Code can now:

Database Operations:

"Query the user table in our production database and show the top 10 most active users"
"Get the schema information for the orders table"
"Explain the execution plan for this complex query"

API Testing:

"Test the health of our GitHub API integration"
"Run a comprehensive test suite on our internal API endpoints"
"Make a request to create a new repository via the GitHub API"

Monitoring & Analytics:

"Show me the performance statistics for all our API endpoints"
"Get the audit log for database operations in the last hour"
"Check the health status of all connected systems"

🔮 Part 3 Preview

In the upcoming Part 3, we’ll cover:

Advanced Topics:

  • Real-time WebSocket connections for live data streams
  • Message queue integration (Redis, RabbitMQ, Apache Kafka)
  • Machine learning model integration for AI-powered tools
  • Custom plugin architecture for extensible functionality

Enterprise Features:

  • Multi-tenant architecture for SaaS deployments
  • Advanced caching strategies with Redis clustering
  • Distributed tracing with OpenTelemetry
  • High availability and disaster recovery patterns

Advanced Security:

  • OAuth2 and SAML integration for enterprise SSO
  • Certificate-based authentication for service-to-service communication
  • Data encryption at rest and in transit
  • Compliance frameworks (SOC2, GDPR, HIPAA)

Stay tuned for the final part of this comprehensive MCP server development series!


🔗 Resources & References

ResourceLink
📂 Part 1Basic MCP Server Setup
🐳 Docker HubOfficial PostgreSQL Image
📖 API DocumentationMCP Protocol Specification
🛠️ Example RepositoryAdvanced MCP Server Examples
Yen

Yen

Yen