Building an Intelligent Bitcoin Trading System with AWS CDK and ML Models

The cryptocurrency market operates 24/7 with extreme volatility, making manual trading challenging and inefficient. This post explores building a production-ready automated Bitcoin trading system that leverages AWS services, machine learning models, and infrastructure-as-code practices to make intelligent trading decisions based on historical data analysis and real-time price predictions.

The Problem: Automated Crypto Trading at Scale

Building a reliable cryptocurrency trading system presents several unique challenges:

  • Real-Time Data Processing: Bitcoin prices change every second, requiring near-instant analysis
  • Historical Data Management: Storing and analyzing years of price history for pattern recognition
  • ML Model Integration: Leveraging multiple prediction models (Bedrock, HuggingFace, custom APIs)
  • Risk Management: Implementing safeguards against catastrophic losses
  • Event-Driven Architecture: Responding to market events with millisecond latency
  • Cost Optimization: Running ML inference efficiently without breaking the bank
  • Audit Trail: Maintaining complete transaction history for compliance and analysis

Architecture Overview: Event-Driven ML Trading Pipeline

Our architecture combines serverless components, managed ML services, and event-driven patterns to create a scalable, cost-effective trading system:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  CoinGecko/     │    │   EventBridge   │    │     Lambda      │
│  Binance API    │ -> │   Scheduler     │ -> │ Price Collector │
│                 │    │ (Every 1 min)   │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                                       │
                                                       v
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   DynamoDB      │    │   S3 Bucket     │    │   Timestream    │
│ Historical Data │ <- │ Raw Price Data  │ <- │  Time Series    │
│                 │    │                 │    │     Database    │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                                       │
                                                       v
                       ┌────────────────────────────────────────┐
                       │          ML Prediction Layer           │
                       │                                        │
                       │  ┌──────────┐  ┌──────────┐  ┌──────┐ │
                       │  │ Bedrock  │  │HuggingFace│  │ API  │ │
                       │  │  Claude  │  │  Model    │  │Custom│ │
                       │  └──────────┘  └──────────┘  └──────┘ │
                       └────────────────────────────────────────┘
                                         │
                                         v
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   SNS Topic     │    │  Step Functions │    │     Lambda      │
│ Trade Alerts    │ <- │ Trading Logic   │ <- │  Trade Executor │
│                 │    │                 │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                │
                                v
                       ┌─────────────────┐
                       │   DynamoDB      │
                       │  Trade History  │
                       │                 │
                       └─────────────────┘

Data Flow Breakdown:

  1. Price Collection: EventBridge triggers Lambda to fetch Bitcoin prices every minute
  2. Storage: Raw data stored in S3, processed data in DynamoDB and Timestream
  3. Historical Analysis: Lambda analyzes patterns from historical data
  4. ML Prediction: Multiple models (Bedrock, HuggingFace, custom) generate predictions
  5. Decision Engine: Step Functions orchestrates trading logic based on predictions
  6. Trade Execution: Lambda executes buy/sell orders via exchange APIs
  7. Notification: SNS alerts stakeholders of trade executions
  8. Audit: All trades logged in DynamoDB for compliance and analysis

Infrastructure as Code: AWS CDK Implementation

Let’s build this system step-by-step using AWS CDK with TypeScript.

Project Structure

bitcoin-trading-cdk/
├── lib/
│   ├── bitcoin-trading-stack.ts
│   ├── constructs/
│   │   ├── data-ingestion-construct.ts
│   │   ├── ml-prediction-construct.ts
│   │   ├── trading-engine-construct.ts
│   │   └── monitoring-construct.ts
│   └── lambda/
│       ├── price-collector/
│       ├── ml-predictor/
│       ├── trade-executor/
│       └── risk-manager/
├── config/
│   └── trading-config.json
└── bin/
    └── bitcoin-trading-app.ts

Configuration Management

Environment-specific configurations for risk management and cost optimization:

 1// config/trading-config.json
 2{
 3  "dev": {
 4    "priceCollectionInterval": 5,  // minutes
 5    "maxTradeAmount": 100,          // USD
 6    "enableRealTrading": false,
 7    "mlModels": ["bedrock"],
 8    "riskThreshold": 0.7
 9  },
10  "prod": {
11    "priceCollectionInterval": 1,
12    "maxTradeAmount": 10000,
13    "enableRealTrading": true,
14    "mlModels": ["bedrock", "huggingface", "custom"],
15    "riskThreshold": 0.85
16  }
17}

1. Data Ingestion Layer

The data ingestion construct handles Bitcoin price collection and historical storage:

 1// lib/constructs/data-ingestion-construct.ts
 2import * as cdk from 'aws-cdk-lib';
 3import * as lambda from 'aws-cdk-lib/aws-lambda';
 4import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
 5import * as s3 from 'aws-cdk-lib/aws-s3';
 6import * as events from 'aws-cdk-lib/aws-events';
 7import * as targets from 'aws-cdk-lib/aws-events-targets';
 8import * as timestream from 'aws-cdk-lib/aws-timestream';
 9import { Construct } from 'constructs';
10
11export interface DataIngestionProps {
12  collectionInterval: number;  // minutes
13}
14
15export class DataIngestionConstruct extends Construct {
16  public readonly priceTable: dynamodb.Table;
17  public readonly priceBucket: s3.Bucket;
18  public readonly timestreamDb: timestream.CfnDatabase;
19
20  constructor(scope: Construct, id: string, props: DataIngestionProps) {
21    super(scope, id);
22
23    // S3 bucket for raw price data
24    this.priceBucket = new s3.Bucket(this, 'PriceDataBucket', {
25      versioned: true,
26      lifecycleRules: [{
27        expiration: cdk.Duration.days(90),
28        transitions: [{
29          storageClass: s3.StorageClass.GLACIER,
30          transitionAfter: cdk.Duration.days(30),
31        }],
32      }],
33      encryption: s3.BucketEncryption.S3_MANAGED,
34    });
35
36    // DynamoDB table for queryable historical data
37    this.priceTable = new dynamodb.Table(this, 'BitcoinPriceTable', {
38      partitionKey: { name: 'symbol', type: dynamodb.AttributeType.STRING },
39      sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
40      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
41      timeToLiveAttribute: 'ttl',
42      pointInTimeRecovery: true,
43      stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
44    });
45
46    // Timestream for time-series analysis
47    this.timestreamDb = new timestream.CfnDatabase(this, 'PriceTimestream', {
48      databaseName: 'bitcoin-prices',
49    });
50
51    const timestreamTable = new timestream.CfnTable(this, 'PriceTimestreamTable', {
52      databaseName: this.timestreamDb.ref,
53      tableName: 'price-history',
54      retentionProperties: {
55        MemoryStoreRetentionPeriodInHours: '24',
56        MagneticStoreRetentionPeriodInDays: '365',
57      },
58    });
59
60    // Lambda for price collection
61    const priceCollectorFn = new lambda.Function(this, 'PriceCollector', {
62      runtime: lambda.Runtime.PYTHON_3_11,
63      handler: 'index.handler',
64      code: lambda.Code.fromAsset('lib/lambda/price-collector'),
65      timeout: cdk.Duration.seconds(60),
66      memorySize: 512,
67      environment: {
68        PRICE_TABLE: this.priceTable.tableName,
69        PRICE_BUCKET: this.priceBucket.bucketName,
70        TIMESTREAM_DB: this.timestreamDb.ref,
71        TIMESTREAM_TABLE: timestreamTable.ref,
72      },
73    });
74
75    // Grant permissions
76    this.priceTable.grantWriteData(priceCollectorFn);
77    this.priceBucket.grantWrite(priceCollectorFn);
78
79    // EventBridge rule to trigger price collection
80    const collectionRule = new events.Rule(this, 'PriceCollectionRule', {
81      schedule: events.Schedule.rate(cdk.Duration.minutes(props.collectionInterval)),
82      description: 'Trigger Bitcoin price collection',
83    });
84
85    collectionRule.addTarget(new targets.LambdaFunction(priceCollectorFn));
86  }
87}

Price Collection Lambda Implementation

  1# lib/lambda/price-collector/index.py
  2import json
  3import boto3
  4import requests
  5from datetime import datetime, timedelta
  6from decimal import Decimal
  7
  8dynamodb = boto3.resource('dynamodb')
  9s3_client = boto3.client('s3')
 10timestream_client = boto3.client('timestream-write')
 11
 12PRICE_TABLE = os.environ['PRICE_TABLE']
 13PRICE_BUCKET = os.environ['PRICE_BUCKET']
 14TIMESTREAM_DB = os.environ['TIMESTREAM_DB']
 15TIMESTREAM_TABLE = os.environ['TIMESTREAM_TABLE']
 16
 17def handler(event, context):
 18    """Collect Bitcoin price from multiple sources"""
 19
 20    # Fetch from multiple sources for reliability
 21    sources = [
 22        fetch_coinGecko_price(),
 23        fetch_binance_price(),
 24        fetch_coinbase_price(),
 25    ]
 26
 27    # Calculate median price for accuracy
 28    prices = [s['price'] for s in sources if s]
 29    median_price = sorted(prices)[len(prices) // 2]
 30
 31    timestamp = int(datetime.utcnow().timestamp())
 32
 33    # Prepare data
 34    price_data = {
 35        'symbol': 'BTC-USD',
 36        'timestamp': timestamp,
 37        'price': Decimal(str(median_price)),
 38        'volume_24h': Decimal(str(sources[0].get('volume', 0))),
 39        'market_cap': Decimal(str(sources[0].get('market_cap', 0))),
 40        'price_change_24h': Decimal(str(sources[0].get('price_change', 0))),
 41        'sources': [s['source'] for s in sources if s],
 42        'ttl': timestamp + (90 * 24 * 60 * 60),  # 90 days retention
 43    }
 44
 45    # Store in DynamoDB
 46    table = dynamodb.Table(PRICE_TABLE)
 47    table.put_item(Item=price_data)
 48
 49    # Store raw data in S3
 50    s3_key = f"raw-prices/{datetime.utcnow().strftime('%Y/%m/%d')}/{timestamp}.json"
 51    s3_client.put_object(
 52        Bucket=PRICE_BUCKET,
 53        Key=s3_key,
 54        Body=json.dumps(sources, default=str),
 55    )
 56
 57    # Write to Timestream for time-series analysis
 58    write_to_timestream(price_data)
 59
 60    return {
 61        'statusCode': 200,
 62        'body': json.dumps({
 63            'price': float(median_price),
 64            'timestamp': timestamp,
 65        })
 66    }
 67
 68def fetch_coinGecko_price():
 69    """Fetch price from CoinGecko API"""
 70    try:
 71        url = "https://api.coingecko.com/api/v3/simple/price"
 72        params = {
 73            'ids': 'bitcoin',
 74            'vs_currencies': 'usd',
 75            'include_24hr_vol': 'true',
 76            'include_24hr_change': 'true',
 77            'include_market_cap': 'true'
 78        }
 79        response = requests.get(url, params=params, timeout=10)
 80        data = response.json()['bitcoin']
 81
 82        return {
 83            'source': 'coingecko',
 84            'price': data['usd'],
 85            'volume': data.get('usd_24h_vol', 0),
 86            'market_cap': data.get('usd_market_cap', 0),
 87            'price_change': data.get('usd_24h_change', 0),
 88        }
 89    except Exception as e:
 90        print(f"CoinGecko fetch error: {e}")
 91        return None
 92
 93def fetch_binance_price():
 94    """Fetch price from Binance API"""
 95    try:
 96        url = "https://api.binance.com/api/v3/ticker/24hr"
 97        params = {'symbol': 'BTCUSDT'}
 98        response = requests.get(url, params=params, timeout=10)
 99        data = response.json()
100
101        return {
102            'source': 'binance',
103            'price': float(data['lastPrice']),
104            'volume': float(data['volume']),
105            'price_change': float(data['priceChangePercent']),
106        }
107    except Exception as e:
108        print(f"Binance fetch error: {e}")
109        return None
110
111def write_to_timestream(price_data):
112    """Write price data to Timestream for time-series analysis"""
113    records = [{
114        'Time': str(price_data['timestamp'] * 1000),  # milliseconds
115        'TimeUnit': 'MILLISECONDS',
116        'Dimensions': [
117            {'Name': 'symbol', 'Value': price_data['symbol']},
118        ],
119        'MeasureName': 'price_metrics',
120        'MeasureValueType': 'MULTI',
121        'MeasureValues': [
122            {'Name': 'price', 'Value': str(price_data['price']), 'Type': 'DOUBLE'},
123            {'Name': 'volume', 'Value': str(price_data['volume_24h']), 'Type': 'DOUBLE'},
124            {'Name': 'market_cap', 'Value': str(price_data['market_cap']), 'Type': 'DOUBLE'},
125        ],
126    }]
127
128    try:
129        timestream_client.write_records(
130            DatabaseName=TIMESTREAM_DB,
131            TableName=TIMESTREAM_TABLE,
132            Records=records,
133        )
134    except Exception as e:
135        print(f"Timestream write error: {e}")

2. ML Prediction Layer

The prediction layer integrates multiple ML models for robust price forecasting:

 1// lib/constructs/ml-prediction-construct.ts
 2import * as cdk from 'aws-cdk-lib';
 3import * as lambda from 'aws-cdk-lib/aws-lambda';
 4import * as iam from 'aws-cdk-lib/aws-iam';
 5import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';
 6import { Construct } from 'constructs';
 7
 8export interface MLPredictionProps {
 9  priceTableName: string;
10  enabledModels: string[];  // ['bedrock', 'huggingface', 'custom']
11}
12
13export class MLPredictionConstruct extends Construct {
14  public readonly predictorFunction: lambda.Function;
15
16  constructor(scope: Construct, id: string, props: MLPredictionProps) {
17    super(scope, id);
18
19    // Lambda Layer for ML libraries
20    const mlLayer = new lambda.LayerVersion(this, 'MLLibrariesLayer', {
21      code: lambda.Code.fromAsset('lib/lambda/layers/ml-libs'),
22      compatibleRuntimes: [lambda.Runtime.PYTHON_3_11],
23      description: 'ML libraries: numpy, pandas, scikit-learn',
24    });
25
26    // Prediction Lambda
27    this.predictorFunction = new lambda.Function(this, 'MLPredictor', {
28      runtime: lambda.Runtime.PYTHON_3_11,
29      handler: 'index.handler',
30      code: lambda.Code.fromAsset('lib/lambda/ml-predictor'),
31      timeout: cdk.Duration.seconds(300),
32      memorySize: 3008,  // Max memory for ML inference
33      layers: [mlLayer],
34      environment: {
35        PRICE_TABLE: props.priceTableName,
36        ENABLED_MODELS: JSON.stringify(props.enabledModels),
37        BEDROCK_MODEL_ID: 'anthropic.claude-3-sonnet-20240229-v1:0',
38      },
39    });
40
41    // Grant Bedrock access
42    if (props.enabledModels.includes('bedrock')) {
43      this.predictorFunction.addToRolePolicy(new iam.PolicyStatement({
44        actions: [
45          'bedrock:InvokeModel',
46          'bedrock:InvokeModelWithResponseStream',
47        ],
48        resources: ['*'],
49      }));
50    }
51
52    // Grant SageMaker access for HuggingFace models
53    if (props.enabledModels.includes('huggingface')) {
54      this.predictorFunction.addToRolePolicy(new iam.PolicyStatement({
55        actions: [
56          'sagemaker:InvokeEndpoint',
57        ],
58        resources: ['*'],
59      }));
60    }
61
62    // Grant DynamoDB read access
63    const priceTable = dynamodb.Table.fromTableName(this, 'PriceTable', props.priceTableName);
64    priceTable.grantReadData(this.predictorFunction);
65  }
66}

ML Prediction Lambda Implementation

  1# lib/lambda/ml-predictor/index.py
  2import json
  3import boto3
  4import numpy as np
  5import pandas as pd
  6from datetime import datetime, timedelta
  7from typing import Dict, List, Tuple
  8
  9bedrock_runtime = boto3.client('bedrock-runtime')
 10sagemaker_runtime = boto3.client('sagemaker-runtime')
 11dynamodb = boto3.resource('dynamodb')
 12
 13PRICE_TABLE = os.environ['PRICE_TABLE']
 14ENABLED_MODELS = json.loads(os.environ['ENABLED_MODELS'])
 15BEDROCK_MODEL_ID = os.environ['BEDROCK_MODEL_ID']
 16
 17def handler(event, context):
 18    """Generate price predictions using multiple ML models"""
 19
 20    # Fetch historical data
 21    historical_data = fetch_historical_prices(days=30)
 22
 23    if len(historical_data) < 100:
 24        return {'error': 'Insufficient historical data'}
 25
 26    # Generate predictions from all enabled models
 27    predictions = {}
 28
 29    if 'bedrock' in ENABLED_MODELS:
 30        predictions['bedrock'] = predict_with_bedrock(historical_data)
 31
 32    if 'huggingface' in ENABLED_MODELS:
 33        predictions['huggingface'] = predict_with_huggingface(historical_data)
 34
 35    if 'custom' in ENABLED_MODELS:
 36        predictions['custom'] = predict_with_custom_model(historical_data)
 37
 38    # Ensemble predictions for robustness
 39    final_prediction = ensemble_predictions(predictions)
 40
 41    return {
 42        'statusCode': 200,
 43        'body': json.dumps({
 44            'prediction': final_prediction,
 45            'individual_predictions': predictions,
 46            'confidence': calculate_confidence(predictions),
 47            'timestamp': int(datetime.utcnow().timestamp()),
 48        })
 49    }
 50
 51def fetch_historical_prices(days: int = 30) -> pd.DataFrame:
 52    """Fetch historical price data from DynamoDB"""
 53    table = dynamodb.Table(PRICE_TABLE)
 54
 55    start_timestamp = int((datetime.utcnow() - timedelta(days=days)).timestamp())
 56
 57    response = table.query(
 58        KeyConditionExpression='symbol = :symbol AND #ts >= :start_ts',
 59        ExpressionAttributeNames={'#ts': 'timestamp'},
 60        ExpressionAttributeValues={
 61            ':symbol': 'BTC-USD',
 62            ':start_ts': start_timestamp,
 63        },
 64        ScanIndexForward=True,  # Ascending order
 65    )
 66
 67    df = pd.DataFrame(response['Items'])
 68    df['price'] = df['price'].astype(float)
 69    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
 70    df = df.sort_values('timestamp')
 71
 72    return df
 73
 74def predict_with_bedrock(data: pd.DataFrame) -> Dict:
 75    """Use AWS Bedrock Claude for price prediction"""
 76
 77    # Prepare context for Claude
 78    recent_prices = data.tail(50)['price'].tolist()
 79    price_changes = data['price_change_24h'].tail(10).tolist()
 80
 81    prompt = f"""You are a Bitcoin market analyst. Based on the following data, predict the Bitcoin price movement for the next hour.
 82
 83Recent prices (last 50 data points): {recent_prices}
 84Recent 24h price changes: {price_changes}
 85
 86Current price: ${recent_prices[-1]:.2f}
 87
 88Provide your analysis in JSON format:
 89{{
 90    "predicted_price": <number>,
 91    "direction": "<up|down|stable>",
 92    "confidence": <0-1>,
 93    "reasoning": "<brief explanation>"
 94}}"""
 95
 96    try:
 97        response = bedrock_runtime.invoke_model(
 98            modelId=BEDROCK_MODEL_ID,
 99            body=json.dumps({
100                "anthropic_version": "bedrock-2023-05-31",
101                "max_tokens": 1000,
102                "messages": [{
103                    "role": "user",
104                    "content": prompt
105                }]
106            })
107        )
108
109        result = json.loads(response['body'].read())
110        prediction_text = result['content'][0]['text']
111
112        # Extract JSON from response
113        prediction = json.loads(prediction_text)
114
115        return {
116            'model': 'bedrock-claude',
117            'predicted_price': prediction['predicted_price'],
118            'direction': prediction['direction'],
119            'confidence': prediction['confidence'],
120            'reasoning': prediction['reasoning'],
121        }
122
123    except Exception as e:
124        print(f"Bedrock prediction error: {e}")
125        return None
126
127def predict_with_huggingface(data: pd.DataFrame) -> Dict:
128    """Use HuggingFace model deployed on SageMaker"""
129
130    # Prepare features for time-series model
131    features = prepare_time_series_features(data)
132
133    try:
134        response = sagemaker_runtime.invoke_endpoint(
135            EndpointName='bitcoin-price-predictor',
136            ContentType='application/json',
137            Body=json.dumps({'instances': features.tolist()})
138        )
139
140        result = json.loads(response['Body'].read())
141
142        return {
143            'model': 'huggingface-timeseries',
144            'predicted_price': result['predictions'][0],
145            'confidence': result['confidence'],
146        }
147
148    except Exception as e:
149        print(f"HuggingFace prediction error: {e}")
150        return None
151
152def predict_with_custom_model(data: pd.DataFrame) -> Dict:
153    """Use custom LSTM/GRU model for price prediction"""
154
155    # Simple moving average + momentum-based prediction
156    prices = data['price'].values
157
158    # Technical indicators
159    sma_20 = np.mean(prices[-20:])
160    sma_50 = np.mean(prices[-50:])
161    momentum = (prices[-1] - prices[-10]) / prices[-10]
162
163    # Simple prediction logic
164    if sma_20 > sma_50 and momentum > 0:
165        direction = 'up'
166        predicted_price = prices[-1] * (1 + momentum * 0.5)
167    elif sma_20 < sma_50 and momentum < 0:
168        direction = 'down'
169        predicted_price = prices[-1] * (1 + momentum * 0.5)
170    else:
171        direction = 'stable'
172        predicted_price = prices[-1]
173
174    return {
175        'model': 'custom-technical-analysis',
176        'predicted_price': float(predicted_price),
177        'direction': direction,
178        'confidence': 0.7,
179        'indicators': {
180            'sma_20': float(sma_20),
181            'sma_50': float(sma_50),
182            'momentum': float(momentum),
183        }
184    }
185
186def ensemble_predictions(predictions: Dict) -> Dict:
187    """Combine multiple predictions using weighted ensemble"""
188
189    valid_predictions = [p for p in predictions.values() if p is not None]
190
191    if not valid_predictions:
192        return None
193
194    # Weight predictions by confidence
195    total_confidence = sum(p['confidence'] for p in valid_predictions)
196    weighted_price = sum(
197        p['predicted_price'] * p['confidence']
198        for p in valid_predictions
199    ) / total_confidence
200
201    # Determine consensus direction
202    directions = [p['direction'] for p in valid_predictions]
203    consensus_direction = max(set(directions), key=directions.count)
204
205    return {
206        'predicted_price': weighted_price,
207        'direction': consensus_direction,
208        'num_models': len(valid_predictions),
209    }
210
211def calculate_confidence(predictions: Dict) -> float:
212    """Calculate overall confidence based on model agreement"""
213
214    valid_predictions = [p for p in predictions.values() if p is not None]
215
216    if not valid_predictions:
217        return 0.0
218
219    # Check direction agreement
220    directions = [p['direction'] for p in valid_predictions]
221    agreement_ratio = directions.count(max(set(directions), key=directions.count)) / len(directions)
222
223    # Average confidence weighted by agreement
224    avg_confidence = np.mean([p['confidence'] for p in valid_predictions])
225
226    return float(avg_confidence * agreement_ratio)

3. Trading Engine with Risk Management

Step Functions orchestrates the trading logic with built-in safety checks:

  1// lib/constructs/trading-engine-construct.ts
  2import * as cdk from 'aws-cdk-lib';
  3import * as lambda from 'aws-cdk-lib/aws-lambda';
  4import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
  5import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
  6import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
  7import * as sns from 'aws-cdk-lib/aws-sns';
  8import * as events from 'aws-cdk-lib/aws-events';
  9import * as targets from 'aws-cdk-lib/aws-events-targets';
 10import { Construct } from 'constructs';
 11
 12export interface TradingEngineProps {
 13  predictorFunction: lambda.IFunction;
 14  maxTradeAmount: number;
 15  riskThreshold: number;
 16  enableRealTrading: boolean;
 17}
 18
 19export class TradingEngineConstruct extends Construct {
 20  public readonly tradeTable: dynamodb.Table;
 21  public readonly alertTopic: sns.Topic;
 22
 23  constructor(scope: Construct, id: string, props: TradingEngineProps) {
 24    super(scope, id);
 25
 26    // Trade history table
 27    this.tradeTable = new dynamodb.Table(this, 'TradeHistory', {
 28      partitionKey: { name: 'trade_id', type: dynamodb.AttributeType.STRING },
 29      sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
 30      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
 31    });
 32
 33    // SNS topic for trade alerts
 34    this.alertTopic = new sns.Topic(this, 'TradeAlerts', {
 35      displayName: 'Bitcoin Trading Alerts',
 36    });
 37
 38    // Risk manager Lambda
 39    const riskManagerFn = new lambda.Function(this, 'RiskManager', {
 40      runtime: lambda.Runtime.PYTHON_3_11,
 41      handler: 'index.handler',
 42      code: lambda.Code.fromAsset('lib/lambda/risk-manager'),
 43      timeout: cdk.Duration.seconds(30),
 44      environment: {
 45        TRADE_TABLE: this.tradeTable.tableName,
 46        MAX_TRADE_AMOUNT: props.maxTradeAmount.toString(),
 47        RISK_THRESHOLD: props.riskThreshold.toString(),
 48      },
 49    });
 50
 51    this.tradeTable.grantReadData(riskManagerFn);
 52
 53    // Trade executor Lambda
 54    const tradeExecutorFn = new lambda.Function(this, 'TradeExecutor', {
 55      runtime: lambda.Runtime.PYTHON_3_11,
 56      handler: 'index.handler',
 57      code: lambda.Code.fromAsset('lib/lambda/trade-executor'),
 58      timeout: cdk.Duration.seconds(60),
 59      environment: {
 60        TRADE_TABLE: this.tradeTable.tableName,
 61        ALERT_TOPIC_ARN: this.alertTopic.topicArn,
 62        ENABLE_REAL_TRADING: props.enableRealTrading.toString(),
 63      },
 64    });
 65
 66    this.tradeTable.grantWriteData(tradeExecutorFn);
 67    this.alertTopic.grantPublish(tradeExecutorFn);
 68
 69    // Step Functions workflow
 70    const getPrediction = new tasks.LambdaInvoke(this, 'GetPrediction', {
 71      lambdaFunction: props.predictorFunction,
 72      outputPath: '$.Payload',
 73    });
 74
 75    const checkRisk = new tasks.LambdaInvoke(this, 'CheckRisk', {
 76      lambdaFunction: riskManagerFn,
 77      outputPath: '$.Payload',
 78    });
 79
 80    const executeTrade = new tasks.LambdaInvoke(this, 'ExecuteTrade', {
 81      lambdaFunction: tradeExecutorFn,
 82      outputPath: '$.Payload',
 83    });
 84
 85    const riskCheckPassed = new sfn.Choice(this, 'RiskCheckPassed')
 86      .when(sfn.Condition.booleanEquals('$.risk_approved', true), executeTrade)
 87      .otherwise(new sfn.Succeed(this, 'TradeRejected'));
 88
 89    const definition = getPrediction
 90      .next(checkRisk)
 91      .next(riskCheckPassed);
 92
 93    const tradingStateMachine = new sfn.StateMachine(this, 'TradingStateMachine', {
 94      definition,
 95      timeout: cdk.Duration.minutes(5),
 96    });
 97
 98    // EventBridge rule to trigger trading workflow
 99    const tradingRule = new events.Rule(this, 'TradingTrigger', {
100      schedule: events.Schedule.rate(cdk.Duration.minutes(5)),
101      description: 'Trigger trading analysis and execution',
102    });
103
104    tradingRule.addTarget(new targets.SfnStateMachine(tradingStateMachine));
105  }
106}

Trade Execution with Safety Checks

  1# lib/lambda/trade-executor/index.py
  2import json
  3import boto3
  4import requests
  5from datetime import datetime
  6from decimal import Decimal
  7
  8dynamodb = boto3.resource('dynamodb')
  9sns_client = boto3.client('sns')
 10
 11TRADE_TABLE = os.environ['TRADE_TABLE']
 12ALERT_TOPIC_ARN = os.environ['ALERT_TOPIC_ARN']
 13ENABLE_REAL_TRADING = os.environ['ENABLE_REAL_TRADING'].lower() == 'true'
 14
 15def handler(event, context):
 16    """Execute trade based on prediction and risk assessment"""
 17
 18    prediction = event['prediction']
 19    risk_analysis = event['risk_analysis']
 20
 21    if not risk_analysis['approved']:
 22        return {
 23            'trade_executed': False,
 24            'reason': risk_analysis['reason']
 25        }
 26
 27    # Determine trade action
 28    current_price = prediction['current_price']
 29    predicted_price = prediction['predicted_price']
 30    confidence = prediction['confidence']
 31
 32    # Trading logic
 33    price_diff_pct = (predicted_price - current_price) / current_price * 100
 34
 35    if price_diff_pct > 2 and confidence > 0.75:
 36        action = 'BUY'
 37        amount = calculate_position_size(risk_analysis, predicted_price)
 38    elif price_diff_pct < -2 and confidence > 0.75:
 39        action = 'SELL'
 40        amount = calculate_position_size(risk_analysis, predicted_price)
 41    else:
 42        action = 'HOLD'
 43        amount = 0
 44
 45    if action == 'HOLD':
 46        return {
 47            'trade_executed': False,
 48            'reason': 'No significant trading signal'
 49        }
 50
 51    # Execute trade
 52    if ENABLE_REAL_TRADING:
 53        trade_result = execute_real_trade(action, amount, current_price)
 54    else:
 55        trade_result = simulate_trade(action, amount, current_price)
 56
 57    # Log trade
 58    log_trade(action, amount, current_price, predicted_price, trade_result)
 59
 60    # Send alert
 61    send_trade_alert(action, amount, current_price, trade_result)
 62
 63    return {
 64        'trade_executed': True,
 65        'action': action,
 66        'amount': amount,
 67        'price': current_price,
 68        'trade_id': trade_result['trade_id']
 69    }
 70
 71def execute_real_trade(action, amount, price):
 72    """Execute trade on exchange (e.g., Binance, Coinbase)"""
 73    # This would integrate with actual exchange APIs
 74    # Example: Binance API
 75    pass
 76
 77def simulate_trade(action, amount, price):
 78    """Simulate trade for testing"""
 79    import uuid
 80    return {
 81        'trade_id': str(uuid.uuid4()),
 82        'simulated': True,
 83        'status': 'filled'
 84    }
 85
 86def log_trade(action, amount, price, predicted_price, result):
 87    """Log trade to DynamoDB"""
 88    table = dynamodb.Table(TRADE_TABLE)
 89
 90    timestamp = int(datetime.utcnow().timestamp())
 91
 92    table.put_item(Item={
 93        'trade_id': result['trade_id'],
 94        'timestamp': timestamp,
 95        'action': action,
 96        'amount': Decimal(str(amount)),
 97        'price': Decimal(str(price)),
 98        'predicted_price': Decimal(str(predicted_price)),
 99        'simulated': result.get('simulated', False),
100        'status': result['status'],
101    })
102
103def send_trade_alert(action, amount, price, result):
104    """Send SNS notification"""
105    message = f"""
106Bitcoin Trade Executed
107
108Action: {action}
109Amount: ${amount:.2f}
110Price: ${price:.2f}
111Trade ID: {result['trade_id']}
112Status: {result['status']}
113{'[SIMULATED]' if result.get('simulated') else '[REAL]'}
114    """
115
116    sns_client.publish(
117        TopicArn=ALERT_TOPIC_ARN,
118        Subject=f'Bitcoin Trade Alert: {action}',
119        Message=message
120    )

Deployment and Configuration

Deploy the complete trading system:

 1# Install dependencies
 2npm install
 3
 4# Deploy to development (simulated trading)
 5cdk deploy --all --context env=dev
 6
 7# Deploy to production (real trading - use with caution!)
 8cdk deploy --all --context env=prod \
 9    --context enableRealTrading=true \
10    --context maxTradeAmount=10000

Monitoring and Performance Tracking

Track system performance with CloudWatch dashboards:

 1// Add monitoring construct
 2new MonitoringConstruct(this, 'Monitoring', {
 3  predictorFunction,
 4  tradeTable,
 5  metrics: [
 6    'PredictionAccuracy',
 7    'TradeSuccessRate',
 8    'PortfolioValue',
 9    'RiskScore'
10  ]
11});

Security Best Practices

  1. API Key Management: Store exchange API keys in AWS Secrets Manager
  2. Encryption: Enable encryption for all data at rest and in transit
  3. IAM Policies: Follow least-privilege principle for all Lambda functions
  4. VPC Isolation: Deploy sensitive components in private subnets
  5. Rate Limiting: Implement throttling to prevent excessive trading

Cost Optimization

  • Lambda Memory: Right-size ML Lambda functions (3GB for predictions)
  • DynamoDB: Use on-demand billing for unpredictable workloads
  • S3 Lifecycle: Archive old price data to Glacier after 30 days
  • Reserved Capacity: Consider reserved instances for SageMaker endpoints

Conclusion

This Bitcoin trading system demonstrates how modern cloud architecture can power sophisticated financial applications. The combination of event-driven design, multiple ML models, and comprehensive risk management creates a robust platform for automated trading.

Key Takeaways

  • Multi-Model Predictions: Ensemble approach improves accuracy and reduces false signals
  • Event-Driven Architecture: Serverless design scales automatically with market activity
  • Risk Management: Built-in safety checks prevent catastrophic losses
  • Infrastructure as Code: CDK enables reproducible, version-controlled deployments
  • Cost-Effective: Pay only for actual trading activity and predictions

The complete implementation provides a foundation for building production-grade trading systems that can be extended with additional strategies, risk models, and exchange integrations.

Disclaimer: This system is for educational purposes. Cryptocurrency trading involves substantial risk. Always test thoroughly and never invest more than you can afford to lose.

Yen

Yen

Yen