Building Centralized Logging with OpenSearch and AWS CDK

Modern cloud-native applications running on Kubernetes generate massive amounts of log data across multiple services, pods, and infrastructure components. Without a centralized logging strategy, debugging issues, monitoring system health, and gaining operational insights becomes nearly impossible. This post explores building a production-ready centralized logging platform using OpenSearch, Kinesis Data Firehose, and AWS CDK.

The Challenge: Taming Kubernetes Log Complexity

Kubernetes environments present unique logging challenges that traditional approaches struggle to address:

  • Distributed Log Sources: Logs scattered across multiple pods, nodes, and services
  • Dynamic Infrastructure: Containers and pods that come and go, making log correlation difficult
  • Volume and Velocity: High-throughput applications generating millions of log entries per day
  • Multi-Format Data: Structured JSON logs mixed with unstructured application logs
  • Operational Overhead: Manual log aggregation and searching across multiple sources
  • Retention and Cost: Balancing log retention needs with storage costs

Why OpenSearch + Kinesis for Centralized Logging?

Before diving into the implementation, let’s understand why this technology combination excels for Kubernetes logging:

OpenSearch: Elasticsearch-Compatible Search and Analytics

OpenSearch provides powerful search and analytics capabilities specifically designed for log data:

 1{
 2  "timestamp": "2024-12-15T14:30:00Z",
 3  "kubernetes": {
 4    "namespace": "production",
 5    "pod_name": "api-server-7d84f9b8c-k5x2p",
 6    "container": "api-server"
 7  },
 8  "level": "ERROR",
 9  "message": "Database connection failed",
10  "request_id": "req-123456789"
11}

Key advantages:

  • Full-text search across all log fields and message content
  • Time-series analysis for performance monitoring and trend analysis
  • Flexible querying with complex filters and aggregations
  • Dashboards and visualizations through OpenSearch Dashboards
  • Cost-effective compared to managed Elasticsearch alternatives

Kinesis Data Firehose: Reliable Log Delivery

Kinesis Data Firehose provides reliable, scalable log delivery with built-in transformation:

Traditional ApproachKinesis Firehose Approach
Custom log shipping agentsManaged delivery service
Manual scaling and monitoringAutomatic scaling
Data transformation complexityBuilt-in Lambda transformation
Single points of failureBuilt-in redundancy

Architecture Overview: End-to-End Log Pipeline

Our centralized logging architecture follows a modern, serverless approach that scales automatically and requires minimal operational overhead:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   EKS Cluster   │    │  CloudWatch     │    │   Kinesis       │
│                 │ -> │     Logs        │ -> │ Data Firehose   │
│ • Pod Logs      │    │                 │    │                 │
│ • App Logs      │    └─────────────────┘    └─────────────────┘
└─────────────────┘                                    │
                                                       v
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│      S3         │    │     Lambda      │    │   OpenSearch    │
│  (Error Logs)   │ <- │  Transformation │ -> │    Service      │
│                 │    │                 │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Data Flow Breakdown:

  1. Log Generation: Kubernetes pods and applications generate logs
  2. Collection: CloudWatch Logs captures logs via subscription filters
  3. Streaming: Kinesis Data Firehose receives log streams in real-time
  4. Transformation: Lambda functions process and enrich log data
  5. Indexing: Processed logs are indexed in OpenSearch for analysis
  6. Error Handling: Failed records are stored in S3 for retry and debugging

Infrastructure as Code: AWS CDK Implementation

Let’s explore the key components of our CDK implementation that makes this architecture deployable and maintainable.

Configuration-Driven Architecture

The stack uses environment-specific configurations to support multiple deployment scenarios:

 1// default-values.json
 2{
 3  "dev": {
 4    "opensearchInstanceType": "t3.small.search",
 5    "opensearchInstanceCount": 1,
 6    "firehoseBufferSize": 5,
 7    "logRetentionDays": 7
 8  },
 9  "prod": {
10    "opensearchInstanceType": "r6g.large.search",
11    "opensearchInstanceCount": 3,
12    "firehoseBufferSize": 128,
13    "logRetentionDays": 365
14  }
15}

This approach enables:

  • Environment consistency with different resource sizing
  • Cost optimization for development environments
  • Production reliability with appropriate redundancy
  • Easy configuration management without code changes

OpenSearch Domain Setup

The OpenSearch domain is configured for high availability and security:

 1const opensearchDomain = new opensearch.Domain(this, 'LoggingDomain', {
 2  version: opensearch.EngineVersion.OPENSEARCH_2_3,
 3  capacity: {
 4    masterNodes: config.opensearch.masterNodeCount,
 5    masterNodeInstanceType: config.opensearch.masterInstanceType,
 6    dataNodes: config.opensearch.dataNodeCount,
 7    dataNodeInstanceType: config.opensearch.dataInstanceType,
 8  },
 9  ebs: {
10    volumeSize: config.opensearch.volumeSize,
11    volumeType: ec2.EbsDeviceVolumeType.GP3,
12  },
13  nodeToNodeEncryption: true,
14  encryptionAtRest: { enabled: true },
15  enforceHttps: true,
16  accessPolicies: [
17    new iam.PolicyStatement({
18      principals: [new iam.ServicePrincipal('firehose.amazonaws.com')],
19      actions: ['es:ESHttpPost', 'es:ESHttpPut'],
20      resources: ['*'],
21    }),
22  ],
23});

Key architectural decisions:

  • Multi-AZ deployment for high availability
  • Encryption at rest and in transit for security compliance
  • Fine-grained access control limiting service access
  • GP3 storage for cost-effective performance

Kinesis Data Firehose Configuration

The Firehose delivery stream handles log transportation with automatic retry and error handling:

 1const deliveryStream = new kinesisFirehose.CfnDeliveryStream(this, 'LogDeliveryStream', {
 2  deliveryStreamType: 'DirectPut',
 3  deliveryStreamName: `${config.stackName}-logs`,
 4  amazonopensearchserviceDestinationConfiguration: {
 5    domainArn: opensearchDomain.domainArn,
 6    indexName: 'application-logs',
 7    roleArn: firehoseRole.roleArn,
 8    processingConfiguration: {
 9      enabled: true,
10      processors: [{
11        type: 'Lambda',
12        parameters: [{
13          parameterName: 'LambdaArn',
14          parameterValue: transformFunction.functionArn,
15        }],
16      }],
17    },
18    bufferingHints: {
19      sizeInMBs: config.firehose.bufferSize,
20      intervalInSeconds: config.firehose.bufferInterval,
21    },
22    retryOptions: {
23      durationInSeconds: 3600,
24    },
25    s3BackupMode: 'FailedDocumentsOnly',
26    s3Configuration: {
27      roleArn: firehoseRole.roleArn,
28      bucketArn: errorBucket.bucketArn,
29      prefix: 'failed-logs/',
30    },
31  },
32});

Performance optimizations:

  • Configurable buffering to balance latency and throughput
  • Lambda transformation for data enrichment and formatting
  • Automatic retry with exponential backoff for transient failures
  • S3 backup for failed records to prevent data loss

Log Transformation: Lambda Processing Pipeline

The Lambda function handles log transformation, enrichment, and formatting for optimal OpenSearch indexing:

Log Processing Logic

 1import json
 2import base64
 3import gzip
 4from datetime import datetime
 5
 6def lambda_handler(event, context):
 7    output = []
 8
 9    for record in event['records']:
10        # Decode the log data
11        compressed_payload = base64.b64decode(record['data'])
12        uncompressed_payload = gzip.decompress(compressed_payload)
13        log_data = json.loads(uncompressed_payload)
14
15        # Process each log event
16        for log_event in log_data['logEvents']:
17            processed_event = process_log_event(log_event, log_data)
18
19            # Format for OpenSearch
20            output_record = {
21                'recordId': record['recordId'],
22                'result': 'Ok',
23                'data': base64.b64encode(
24                    json.dumps(processed_event).encode('utf-8')
25                ).decode('utf-8')
26            }
27            output.append(output_record)
28
29    return {'records': output}
30
31def process_log_event(log_event, metadata):
32    """Process and enrich individual log events"""
33
34    # Extract Kubernetes metadata
35    log_group = metadata.get('logGroup', '')
36    kubernetes_info = parse_kubernetes_metadata(log_group)
37
38    # Parse application log message
39    message = log_event['message']
40    parsed_message = parse_application_log(message)
41
42    # Create enriched log document
43    enriched_log = {
44        '@timestamp': datetime.utcfromtimestamp(
45            log_event['timestamp'] / 1000
46        ).isoformat(),
47        'kubernetes': kubernetes_info,
48        'log_level': parsed_message.get('level', 'INFO'),
49        'message': parsed_message.get('message', message),
50        'request_id': parsed_message.get('request_id'),
51        'source': {
52            'log_group': log_group,
53            'log_stream': metadata.get('logStream', ''),
54        }
55    }
56
57    return enriched_log

Kubernetes Metadata Extraction

The transformation function extracts valuable Kubernetes context from CloudWatch log groups:

 1def parse_kubernetes_metadata(log_group):
 2    """Extract Kubernetes metadata from log group names"""
 3
 4    # Example: /aws/eks/my-cluster/application/namespace/pod-name
 5    parts = log_group.split('/')
 6
 7    if 'eks' in parts:
 8        return {
 9            'cluster_name': parts[3] if len(parts) > 3 else 'unknown',
10            'namespace': parts[5] if len(parts) > 5 else 'default',
11            'pod_name': parts[6] if len(parts) > 6 else 'unknown',
12        }
13
14    return {'source': 'application'}

Deployment and Configuration Management

The CDK stack supports flexible deployment scenarios through configuration-driven infrastructure:

Multi-Environment Deployment

Deploy different configurations for various environments:

1# Development environment with minimal resources
2cdk deploy --all --context env=dev
3
4# Production environment with high availability
5cdk deploy --all --context env=prod \
6    --context enableVpc=true \
7    --context opensearchInstanceCount=3

Selective Log Stream Configuration

Configure specific log groups for OpenSearch ingestion:

1# Deploy with specific EKS log groups
2cdk deploy --all \
3    --context eksPodGroup="/aws/eks/MyCluster/application" \
4    --context logRetentionDays=30

VPC Integration

For enhanced security, deploy OpenSearch within a VPC:

1// Enable VPC deployment
2const vpcStack = new VpcStack(app, 'LoggingVpcStack', {
3  env: { region: 'us-west-2' }
4});
5
6const opensearchStack = new OpenSearchStack(app, 'OpenSearchStack', {
7  vpc: vpcStack.vpc,
8  env: { region: 'us-west-2' }
9});

Monitoring and Observability

A centralized logging platform requires comprehensive monitoring of its own performance:

Key Metrics to Track

 1// CloudWatch metrics for monitoring
 2const logIngestionRate = new cloudwatch.Metric({
 3  namespace: 'AWS/Kinesis/Firehose',
 4  metricName: 'IncomingRecords',
 5  dimensionsMap: {
 6    DeliveryStreamName: deliveryStream.deliveryStreamName,
 7  },
 8});
 9
10const opensearchIndexingErrors = new cloudwatch.Metric({
11  namespace: 'AWS/ES',
12  metricName: 'IndexingErrors',
13  dimensionsMap: {
14    DomainName: opensearchDomain.domainName,
15  },
16});

Alerting Configuration

 1// Alert on high error rates
 2new cloudwatch.Alarm(this, 'HighIndexingErrors', {
 3  metric: opensearchIndexingErrors,
 4  threshold: 10,
 5  evaluationPeriods: 2,
 6  treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING,
 7});
 8
 9// Alert on low log ingestion (potential pipeline issues)
10new cloudwatch.Alarm(this, 'LowLogIngestion', {
11  metric: logIngestionRate,
12  threshold: 100,
13  evaluationPeriods: 3,
14  comparisonOperator: cloudwatch.ComparisonOperator.LESS_THAN_THRESHOLD,
15});

Production Considerations and Best Practices

Cost Optimization Strategies

  1. Right-sizing OpenSearch instances based on actual usage patterns
  2. Implementing log lifecycle policies for automated data archival
  3. Using reserved instances for predictable workloads
  4. Configuring appropriate buffer sizes to optimize Firehose costs

Security and Compliance

 1// Fine-grained access control
 2const logAnalystRole = new iam.Role(this, 'LogAnalystRole', {
 3  assumedBy: new iam.ServicePrincipal('opensearch.amazonaws.com'),
 4  inlinePolicies: {
 5    LogSearchPolicy: new iam.PolicyDocument({
 6      statements: [
 7        new iam.PolicyStatement({
 8          actions: ['es:ESHttpGet', 'es:ESHttpPost'],
 9          resources: [`${opensearchDomain.domainArn}/application-logs/_search`],
10        }),
11      ],
12    }),
13  },
14});

Disaster Recovery

  1. Cross-region replication for critical log data
  2. Automated backup strategies using S3 lifecycle policies
  3. Infrastructure versioning through CDK and Git
  4. Runbook procedures for common failure scenarios

Real-World Usage Patterns

Application Performance Monitoring

Query examples for common operational scenarios:

 1{
 2  "query": {
 3    "bool": {
 4      "must": [
 5        {"range": {"@timestamp": {"gte": "now-1h"}}},
 6        {"term": {"log_level": "ERROR"}},
 7        {"exists": {"field": "request_id"}}
 8      ]
 9    }
10  },
11  "aggs": {
12    "error_by_service": {
13      "terms": {"field": "kubernetes.namespace"}
14    }
15  }
16}

Debugging Distributed Transactions

Trace requests across multiple services using correlation IDs:

1{
2  "query": {
3    "term": {"request_id": "req-123456789"}
4  },
5  "sort": [{"@timestamp": "asc"}]
6}

Conclusion: Building Reliable Observability

This centralized logging architecture provides a foundation for comprehensive Kubernetes observability that scales with your infrastructure. The combination of OpenSearch’s powerful search capabilities, Kinesis Firehose’s reliable delivery, and CDK’s infrastructure-as-code approach creates a maintainable, cost-effective solution.

Key Takeaways

  • Serverless architecture reduces operational overhead while providing automatic scaling
  • Configuration-driven deployment enables consistent multi-environment management
  • Comprehensive error handling prevents log data loss during system failures
  • Built-in monitoring provides visibility into the logging pipeline itself
  • Cost optimization through right-sizing and lifecycle policies

As your Kubernetes infrastructure grows, this logging platform provides the observability foundation needed for maintaining reliable, performant applications at scale.

The complete implementation is available in the CDK Playground repository, including deployment scripts, configuration examples, and monitoring templates.

Yen

Yen

Yen