Modern cloud-native applications running on Kubernetes generate massive amounts of log data across multiple services, pods, and infrastructure components. Without a centralized logging strategy, debugging issues, monitoring system health, and gaining operational insights becomes nearly impossible. This post explores building a production-ready centralized logging platform using OpenSearch, Kinesis Data Firehose, and AWS CDK.
The Challenge: Taming Kubernetes Log Complexity
Kubernetes environments present unique logging challenges that traditional approaches struggle to address:
- Distributed Log Sources: Logs scattered across multiple pods, nodes, and services
- Dynamic Infrastructure: Containers and pods that come and go, making log correlation difficult
- Volume and Velocity: High-throughput applications generating millions of log entries per day
- Multi-Format Data: Structured JSON logs mixed with unstructured application logs
- Operational Overhead: Manual log aggregation and searching across multiple sources
- Retention and Cost: Balancing log retention needs with storage costs
Why OpenSearch + Kinesis for Centralized Logging?
Before diving into the implementation, let’s understand why this technology combination excels for Kubernetes logging:
OpenSearch: Elasticsearch-Compatible Search and Analytics
OpenSearch provides powerful search and analytics capabilities specifically designed for log data:
1{
2 "timestamp": "2024-12-15T14:30:00Z",
3 "kubernetes": {
4 "namespace": "production",
5 "pod_name": "api-server-7d84f9b8c-k5x2p",
6 "container": "api-server"
7 },
8 "level": "ERROR",
9 "message": "Database connection failed",
10 "request_id": "req-123456789"
11}
Key advantages:
- Full-text search across all log fields and message content
- Time-series analysis for performance monitoring and trend analysis
- Flexible querying with complex filters and aggregations
- Dashboards and visualizations through OpenSearch Dashboards
- Cost-effective compared to managed Elasticsearch alternatives
Kinesis Data Firehose: Reliable Log Delivery
Kinesis Data Firehose provides reliable, scalable log delivery with built-in transformation:
Traditional Approach | Kinesis Firehose Approach |
---|---|
Custom log shipping agents | Managed delivery service |
Manual scaling and monitoring | Automatic scaling |
Data transformation complexity | Built-in Lambda transformation |
Single points of failure | Built-in redundancy |
Architecture Overview: End-to-End Log Pipeline
Our centralized logging architecture follows a modern, serverless approach that scales automatically and requires minimal operational overhead:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ EKS Cluster │ │ CloudWatch │ │ Kinesis │
│ │ -> │ Logs │ -> │ Data Firehose │
│ • Pod Logs │ │ │ │ │
│ • App Logs │ └─────────────────┘ └─────────────────┘
└─────────────────┘ │
v
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ S3 │ │ Lambda │ │ OpenSearch │
│ (Error Logs) │ <- │ Transformation │ -> │ Service │
│ │ │ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Data Flow Breakdown:
- Log Generation: Kubernetes pods and applications generate logs
- Collection: CloudWatch Logs captures logs via subscription filters
- Streaming: Kinesis Data Firehose receives log streams in real-time
- Transformation: Lambda functions process and enrich log data
- Indexing: Processed logs are indexed in OpenSearch for analysis
- Error Handling: Failed records are stored in S3 for retry and debugging
Infrastructure as Code: AWS CDK Implementation
Let’s explore the key components of our CDK implementation that makes this architecture deployable and maintainable.
Configuration-Driven Architecture
The stack uses environment-specific configurations to support multiple deployment scenarios:
1// default-values.json
2{
3 "dev": {
4 "opensearchInstanceType": "t3.small.search",
5 "opensearchInstanceCount": 1,
6 "firehoseBufferSize": 5,
7 "logRetentionDays": 7
8 },
9 "prod": {
10 "opensearchInstanceType": "r6g.large.search",
11 "opensearchInstanceCount": 3,
12 "firehoseBufferSize": 128,
13 "logRetentionDays": 365
14 }
15}
This approach enables:
- Environment consistency with different resource sizing
- Cost optimization for development environments
- Production reliability with appropriate redundancy
- Easy configuration management without code changes
OpenSearch Domain Setup
The OpenSearch domain is configured for high availability and security:
1const opensearchDomain = new opensearch.Domain(this, 'LoggingDomain', {
2 version: opensearch.EngineVersion.OPENSEARCH_2_3,
3 capacity: {
4 masterNodes: config.opensearch.masterNodeCount,
5 masterNodeInstanceType: config.opensearch.masterInstanceType,
6 dataNodes: config.opensearch.dataNodeCount,
7 dataNodeInstanceType: config.opensearch.dataInstanceType,
8 },
9 ebs: {
10 volumeSize: config.opensearch.volumeSize,
11 volumeType: ec2.EbsDeviceVolumeType.GP3,
12 },
13 nodeToNodeEncryption: true,
14 encryptionAtRest: { enabled: true },
15 enforceHttps: true,
16 accessPolicies: [
17 new iam.PolicyStatement({
18 principals: [new iam.ServicePrincipal('firehose.amazonaws.com')],
19 actions: ['es:ESHttpPost', 'es:ESHttpPut'],
20 resources: ['*'],
21 }),
22 ],
23});
Key architectural decisions:
- Multi-AZ deployment for high availability
- Encryption at rest and in transit for security compliance
- Fine-grained access control limiting service access
- GP3 storage for cost-effective performance
Kinesis Data Firehose Configuration
The Firehose delivery stream handles log transportation with automatic retry and error handling:
1const deliveryStream = new kinesisFirehose.CfnDeliveryStream(this, 'LogDeliveryStream', {
2 deliveryStreamType: 'DirectPut',
3 deliveryStreamName: `${config.stackName}-logs`,
4 amazonopensearchserviceDestinationConfiguration: {
5 domainArn: opensearchDomain.domainArn,
6 indexName: 'application-logs',
7 roleArn: firehoseRole.roleArn,
8 processingConfiguration: {
9 enabled: true,
10 processors: [{
11 type: 'Lambda',
12 parameters: [{
13 parameterName: 'LambdaArn',
14 parameterValue: transformFunction.functionArn,
15 }],
16 }],
17 },
18 bufferingHints: {
19 sizeInMBs: config.firehose.bufferSize,
20 intervalInSeconds: config.firehose.bufferInterval,
21 },
22 retryOptions: {
23 durationInSeconds: 3600,
24 },
25 s3BackupMode: 'FailedDocumentsOnly',
26 s3Configuration: {
27 roleArn: firehoseRole.roleArn,
28 bucketArn: errorBucket.bucketArn,
29 prefix: 'failed-logs/',
30 },
31 },
32});
Performance optimizations:
- Configurable buffering to balance latency and throughput
- Lambda transformation for data enrichment and formatting
- Automatic retry with exponential backoff for transient failures
- S3 backup for failed records to prevent data loss
Log Transformation: Lambda Processing Pipeline
The Lambda function handles log transformation, enrichment, and formatting for optimal OpenSearch indexing:
Log Processing Logic
1import json
2import base64
3import gzip
4from datetime import datetime
5
6def lambda_handler(event, context):
7 output = []
8
9 for record in event['records']:
10 # Decode the log data
11 compressed_payload = base64.b64decode(record['data'])
12 uncompressed_payload = gzip.decompress(compressed_payload)
13 log_data = json.loads(uncompressed_payload)
14
15 # Process each log event
16 for log_event in log_data['logEvents']:
17 processed_event = process_log_event(log_event, log_data)
18
19 # Format for OpenSearch
20 output_record = {
21 'recordId': record['recordId'],
22 'result': 'Ok',
23 'data': base64.b64encode(
24 json.dumps(processed_event).encode('utf-8')
25 ).decode('utf-8')
26 }
27 output.append(output_record)
28
29 return {'records': output}
30
31def process_log_event(log_event, metadata):
32 """Process and enrich individual log events"""
33
34 # Extract Kubernetes metadata
35 log_group = metadata.get('logGroup', '')
36 kubernetes_info = parse_kubernetes_metadata(log_group)
37
38 # Parse application log message
39 message = log_event['message']
40 parsed_message = parse_application_log(message)
41
42 # Create enriched log document
43 enriched_log = {
44 '@timestamp': datetime.utcfromtimestamp(
45 log_event['timestamp'] / 1000
46 ).isoformat(),
47 'kubernetes': kubernetes_info,
48 'log_level': parsed_message.get('level', 'INFO'),
49 'message': parsed_message.get('message', message),
50 'request_id': parsed_message.get('request_id'),
51 'source': {
52 'log_group': log_group,
53 'log_stream': metadata.get('logStream', ''),
54 }
55 }
56
57 return enriched_log
Kubernetes Metadata Extraction
The transformation function extracts valuable Kubernetes context from CloudWatch log groups:
1def parse_kubernetes_metadata(log_group):
2 """Extract Kubernetes metadata from log group names"""
3
4 # Example: /aws/eks/my-cluster/application/namespace/pod-name
5 parts = log_group.split('/')
6
7 if 'eks' in parts:
8 return {
9 'cluster_name': parts[3] if len(parts) > 3 else 'unknown',
10 'namespace': parts[5] if len(parts) > 5 else 'default',
11 'pod_name': parts[6] if len(parts) > 6 else 'unknown',
12 }
13
14 return {'source': 'application'}
Deployment and Configuration Management
The CDK stack supports flexible deployment scenarios through configuration-driven infrastructure:
Multi-Environment Deployment
Deploy different configurations for various environments:
1# Development environment with minimal resources
2cdk deploy --all --context env=dev
3
4# Production environment with high availability
5cdk deploy --all --context env=prod \
6 --context enableVpc=true \
7 --context opensearchInstanceCount=3
Selective Log Stream Configuration
Configure specific log groups for OpenSearch ingestion:
1# Deploy with specific EKS log groups
2cdk deploy --all \
3 --context eksPodGroup="/aws/eks/MyCluster/application" \
4 --context logRetentionDays=30
VPC Integration
For enhanced security, deploy OpenSearch within a VPC:
1// Enable VPC deployment
2const vpcStack = new VpcStack(app, 'LoggingVpcStack', {
3 env: { region: 'us-west-2' }
4});
5
6const opensearchStack = new OpenSearchStack(app, 'OpenSearchStack', {
7 vpc: vpcStack.vpc,
8 env: { region: 'us-west-2' }
9});
Monitoring and Observability
A centralized logging platform requires comprehensive monitoring of its own performance:
Key Metrics to Track
1// CloudWatch metrics for monitoring
2const logIngestionRate = new cloudwatch.Metric({
3 namespace: 'AWS/Kinesis/Firehose',
4 metricName: 'IncomingRecords',
5 dimensionsMap: {
6 DeliveryStreamName: deliveryStream.deliveryStreamName,
7 },
8});
9
10const opensearchIndexingErrors = new cloudwatch.Metric({
11 namespace: 'AWS/ES',
12 metricName: 'IndexingErrors',
13 dimensionsMap: {
14 DomainName: opensearchDomain.domainName,
15 },
16});
Alerting Configuration
1// Alert on high error rates
2new cloudwatch.Alarm(this, 'HighIndexingErrors', {
3 metric: opensearchIndexingErrors,
4 threshold: 10,
5 evaluationPeriods: 2,
6 treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING,
7});
8
9// Alert on low log ingestion (potential pipeline issues)
10new cloudwatch.Alarm(this, 'LowLogIngestion', {
11 metric: logIngestionRate,
12 threshold: 100,
13 evaluationPeriods: 3,
14 comparisonOperator: cloudwatch.ComparisonOperator.LESS_THAN_THRESHOLD,
15});
Production Considerations and Best Practices
Cost Optimization Strategies
- Right-sizing OpenSearch instances based on actual usage patterns
- Implementing log lifecycle policies for automated data archival
- Using reserved instances for predictable workloads
- Configuring appropriate buffer sizes to optimize Firehose costs
Security and Compliance
1// Fine-grained access control
2const logAnalystRole = new iam.Role(this, 'LogAnalystRole', {
3 assumedBy: new iam.ServicePrincipal('opensearch.amazonaws.com'),
4 inlinePolicies: {
5 LogSearchPolicy: new iam.PolicyDocument({
6 statements: [
7 new iam.PolicyStatement({
8 actions: ['es:ESHttpGet', 'es:ESHttpPost'],
9 resources: [`${opensearchDomain.domainArn}/application-logs/_search`],
10 }),
11 ],
12 }),
13 },
14});
Disaster Recovery
- Cross-region replication for critical log data
- Automated backup strategies using S3 lifecycle policies
- Infrastructure versioning through CDK and Git
- Runbook procedures for common failure scenarios
Real-World Usage Patterns
Application Performance Monitoring
Query examples for common operational scenarios:
1{
2 "query": {
3 "bool": {
4 "must": [
5 {"range": {"@timestamp": {"gte": "now-1h"}}},
6 {"term": {"log_level": "ERROR"}},
7 {"exists": {"field": "request_id"}}
8 ]
9 }
10 },
11 "aggs": {
12 "error_by_service": {
13 "terms": {"field": "kubernetes.namespace"}
14 }
15 }
16}
Debugging Distributed Transactions
Trace requests across multiple services using correlation IDs:
1{
2 "query": {
3 "term": {"request_id": "req-123456789"}
4 },
5 "sort": [{"@timestamp": "asc"}]
6}
Conclusion: Building Reliable Observability
This centralized logging architecture provides a foundation for comprehensive Kubernetes observability that scales with your infrastructure. The combination of OpenSearch’s powerful search capabilities, Kinesis Firehose’s reliable delivery, and CDK’s infrastructure-as-code approach creates a maintainable, cost-effective solution.
Key Takeaways
- Serverless architecture reduces operational overhead while providing automatic scaling
- Configuration-driven deployment enables consistent multi-environment management
- Comprehensive error handling prevents log data loss during system failures
- Built-in monitoring provides visibility into the logging pipeline itself
- Cost optimization through right-sizing and lifecycle policies
As your Kubernetes infrastructure grows, this logging platform provides the observability foundation needed for maintaining reliable, performant applications at scale.
The complete implementation is available in the CDK Playground repository, including deployment scripts, configuration examples, and monitoring templates.