The cryptocurrency market operates 24/7 with extreme volatility, making manual trading challenging and inefficient. This post explores building a production-ready automated Bitcoin trading system that leverages AWS services, machine learning models, and infrastructure-as-code practices to make intelligent trading decisions based on historical data analysis and real-time price predictions.
The Problem: Automated Crypto Trading at Scale
Building a reliable cryptocurrency trading system presents several unique challenges:
- Real-Time Data Processing: Bitcoin prices change every second, requiring near-instant analysis
- Historical Data Management: Storing and analyzing years of price history for pattern recognition
- ML Model Integration: Leveraging multiple prediction models (Bedrock, HuggingFace, custom APIs)
- Risk Management: Implementing safeguards against catastrophic losses
- Event-Driven Architecture: Responding to market events with millisecond latency
- Cost Optimization: Running ML inference efficiently without breaking the bank
- Audit Trail: Maintaining complete transaction history for compliance and analysis
Architecture Overview: Event-Driven ML Trading Pipeline
Our architecture combines serverless components, managed ML services, and event-driven patterns to create a scalable, cost-effective trading system:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ CoinGecko/ │ │ EventBridge │ │ Lambda │
│ Binance API │ -> │ Scheduler │ -> │ Price Collector │
│ │ │ (Every 1 min) │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
v
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ DynamoDB │ │ S3 Bucket │ │ Timestream │
│ Historical Data │ <- │ Raw Price Data │ <- │ Time Series │
│ │ │ │ │ Database │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
v
┌────────────────────────────────────────┐
│ ML Prediction Layer │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────┐ │
│ │ Bedrock │ │HuggingFace│ │ API │ │
│ │ Claude │ │ Model │ │Custom│ │
│ └──────────┘ └──────────┘ └──────┘ │
└────────────────────────────────────────┘
│
v
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ SNS Topic │ │ Step Functions │ │ Lambda │
│ Trade Alerts │ <- │ Trading Logic │ <- │ Trade Executor │
│ │ │ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
v
┌─────────────────┐
│ DynamoDB │
│ Trade History │
│ │
└─────────────────┘
Data Flow Breakdown:
- Price Collection: EventBridge triggers Lambda to fetch Bitcoin prices every minute
- Storage: Raw data stored in S3, processed data in DynamoDB and Timestream
- Historical Analysis: Lambda analyzes patterns from historical data
- ML Prediction: Multiple models (Bedrock, HuggingFace, custom) generate predictions
- Decision Engine: Step Functions orchestrates trading logic based on predictions
- Trade Execution: Lambda executes buy/sell orders via exchange APIs
- Notification: SNS alerts stakeholders of trade executions
- Audit: All trades logged in DynamoDB for compliance and analysis
Infrastructure as Code: AWS CDK Implementation
Let’s build this system step-by-step using AWS CDK with TypeScript.
Project Structure
bitcoin-trading-cdk/
├── lib/
│ ├── bitcoin-trading-stack.ts
│ ├── constructs/
│ │ ├── data-ingestion-construct.ts
│ │ ├── ml-prediction-construct.ts
│ │ ├── trading-engine-construct.ts
│ │ └── monitoring-construct.ts
│ └── lambda/
│ ├── price-collector/
│ ├── ml-predictor/
│ ├── trade-executor/
│ └── risk-manager/
├── config/
│ └── trading-config.json
└── bin/
└── bitcoin-trading-app.ts
Configuration Management
Environment-specific configurations for risk management and cost optimization:
1// config/trading-config.json
2{
3 "dev": {
4 "priceCollectionInterval": 5, // minutes
5 "maxTradeAmount": 100, // USD
6 "enableRealTrading": false,
7 "mlModels": ["bedrock"],
8 "riskThreshold": 0.7
9 },
10 "prod": {
11 "priceCollectionInterval": 1,
12 "maxTradeAmount": 10000,
13 "enableRealTrading": true,
14 "mlModels": ["bedrock", "huggingface", "custom"],
15 "riskThreshold": 0.85
16 }
17}
1. Data Ingestion Layer
The data ingestion construct handles Bitcoin price collection and historical storage:
1// lib/constructs/data-ingestion-construct.ts
2import * as cdk from 'aws-cdk-lib';
3import * as lambda from 'aws-cdk-lib/aws-lambda';
4import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
5import * as s3 from 'aws-cdk-lib/aws-s3';
6import * as events from 'aws-cdk-lib/aws-events';
7import * as targets from 'aws-cdk-lib/aws-events-targets';
8import * as timestream from 'aws-cdk-lib/aws-timestream';
9import { Construct } from 'constructs';
10
11export interface DataIngestionProps {
12 collectionInterval: number; // minutes
13}
14
15export class DataIngestionConstruct extends Construct {
16 public readonly priceTable: dynamodb.Table;
17 public readonly priceBucket: s3.Bucket;
18 public readonly timestreamDb: timestream.CfnDatabase;
19
20 constructor(scope: Construct, id: string, props: DataIngestionProps) {
21 super(scope, id);
22
23 // S3 bucket for raw price data
24 this.priceBucket = new s3.Bucket(this, 'PriceDataBucket', {
25 versioned: true,
26 lifecycleRules: [{
27 expiration: cdk.Duration.days(90),
28 transitions: [{
29 storageClass: s3.StorageClass.GLACIER,
30 transitionAfter: cdk.Duration.days(30),
31 }],
32 }],
33 encryption: s3.BucketEncryption.S3_MANAGED,
34 });
35
36 // DynamoDB table for queryable historical data
37 this.priceTable = new dynamodb.Table(this, 'BitcoinPriceTable', {
38 partitionKey: { name: 'symbol', type: dynamodb.AttributeType.STRING },
39 sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
40 billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
41 timeToLiveAttribute: 'ttl',
42 pointInTimeRecovery: true,
43 stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
44 });
45
46 // Timestream for time-series analysis
47 this.timestreamDb = new timestream.CfnDatabase(this, 'PriceTimestream', {
48 databaseName: 'bitcoin-prices',
49 });
50
51 const timestreamTable = new timestream.CfnTable(this, 'PriceTimestreamTable', {
52 databaseName: this.timestreamDb.ref,
53 tableName: 'price-history',
54 retentionProperties: {
55 MemoryStoreRetentionPeriodInHours: '24',
56 MagneticStoreRetentionPeriodInDays: '365',
57 },
58 });
59
60 // Lambda for price collection
61 const priceCollectorFn = new lambda.Function(this, 'PriceCollector', {
62 runtime: lambda.Runtime.PYTHON_3_11,
63 handler: 'index.handler',
64 code: lambda.Code.fromAsset('lib/lambda/price-collector'),
65 timeout: cdk.Duration.seconds(60),
66 memorySize: 512,
67 environment: {
68 PRICE_TABLE: this.priceTable.tableName,
69 PRICE_BUCKET: this.priceBucket.bucketName,
70 TIMESTREAM_DB: this.timestreamDb.ref,
71 TIMESTREAM_TABLE: timestreamTable.ref,
72 },
73 });
74
75 // Grant permissions
76 this.priceTable.grantWriteData(priceCollectorFn);
77 this.priceBucket.grantWrite(priceCollectorFn);
78
79 // EventBridge rule to trigger price collection
80 const collectionRule = new events.Rule(this, 'PriceCollectionRule', {
81 schedule: events.Schedule.rate(cdk.Duration.minutes(props.collectionInterval)),
82 description: 'Trigger Bitcoin price collection',
83 });
84
85 collectionRule.addTarget(new targets.LambdaFunction(priceCollectorFn));
86 }
87}
Price Collection Lambda Implementation
1# lib/lambda/price-collector/index.py
2import json
3import boto3
4import requests
5from datetime import datetime, timedelta
6from decimal import Decimal
7
8dynamodb = boto3.resource('dynamodb')
9s3_client = boto3.client('s3')
10timestream_client = boto3.client('timestream-write')
11
12PRICE_TABLE = os.environ['PRICE_TABLE']
13PRICE_BUCKET = os.environ['PRICE_BUCKET']
14TIMESTREAM_DB = os.environ['TIMESTREAM_DB']
15TIMESTREAM_TABLE = os.environ['TIMESTREAM_TABLE']
16
17def handler(event, context):
18 """Collect Bitcoin price from multiple sources"""
19
20 # Fetch from multiple sources for reliability
21 sources = [
22 fetch_coinGecko_price(),
23 fetch_binance_price(),
24 fetch_coinbase_price(),
25 ]
26
27 # Calculate median price for accuracy
28 prices = [s['price'] for s in sources if s]
29 median_price = sorted(prices)[len(prices) // 2]
30
31 timestamp = int(datetime.utcnow().timestamp())
32
33 # Prepare data
34 price_data = {
35 'symbol': 'BTC-USD',
36 'timestamp': timestamp,
37 'price': Decimal(str(median_price)),
38 'volume_24h': Decimal(str(sources[0].get('volume', 0))),
39 'market_cap': Decimal(str(sources[0].get('market_cap', 0))),
40 'price_change_24h': Decimal(str(sources[0].get('price_change', 0))),
41 'sources': [s['source'] for s in sources if s],
42 'ttl': timestamp + (90 * 24 * 60 * 60), # 90 days retention
43 }
44
45 # Store in DynamoDB
46 table = dynamodb.Table(PRICE_TABLE)
47 table.put_item(Item=price_data)
48
49 # Store raw data in S3
50 s3_key = f"raw-prices/{datetime.utcnow().strftime('%Y/%m/%d')}/{timestamp}.json"
51 s3_client.put_object(
52 Bucket=PRICE_BUCKET,
53 Key=s3_key,
54 Body=json.dumps(sources, default=str),
55 )
56
57 # Write to Timestream for time-series analysis
58 write_to_timestream(price_data)
59
60 return {
61 'statusCode': 200,
62 'body': json.dumps({
63 'price': float(median_price),
64 'timestamp': timestamp,
65 })
66 }
67
68def fetch_coinGecko_price():
69 """Fetch price from CoinGecko API"""
70 try:
71 url = "https://api.coingecko.com/api/v3/simple/price"
72 params = {
73 'ids': 'bitcoin',
74 'vs_currencies': 'usd',
75 'include_24hr_vol': 'true',
76 'include_24hr_change': 'true',
77 'include_market_cap': 'true'
78 }
79 response = requests.get(url, params=params, timeout=10)
80 data = response.json()['bitcoin']
81
82 return {
83 'source': 'coingecko',
84 'price': data['usd'],
85 'volume': data.get('usd_24h_vol', 0),
86 'market_cap': data.get('usd_market_cap', 0),
87 'price_change': data.get('usd_24h_change', 0),
88 }
89 except Exception as e:
90 print(f"CoinGecko fetch error: {e}")
91 return None
92
93def fetch_binance_price():
94 """Fetch price from Binance API"""
95 try:
96 url = "https://api.binance.com/api/v3/ticker/24hr"
97 params = {'symbol': 'BTCUSDT'}
98 response = requests.get(url, params=params, timeout=10)
99 data = response.json()
100
101 return {
102 'source': 'binance',
103 'price': float(data['lastPrice']),
104 'volume': float(data['volume']),
105 'price_change': float(data['priceChangePercent']),
106 }
107 except Exception as e:
108 print(f"Binance fetch error: {e}")
109 return None
110
111def write_to_timestream(price_data):
112 """Write price data to Timestream for time-series analysis"""
113 records = [{
114 'Time': str(price_data['timestamp'] * 1000), # milliseconds
115 'TimeUnit': 'MILLISECONDS',
116 'Dimensions': [
117 {'Name': 'symbol', 'Value': price_data['symbol']},
118 ],
119 'MeasureName': 'price_metrics',
120 'MeasureValueType': 'MULTI',
121 'MeasureValues': [
122 {'Name': 'price', 'Value': str(price_data['price']), 'Type': 'DOUBLE'},
123 {'Name': 'volume', 'Value': str(price_data['volume_24h']), 'Type': 'DOUBLE'},
124 {'Name': 'market_cap', 'Value': str(price_data['market_cap']), 'Type': 'DOUBLE'},
125 ],
126 }]
127
128 try:
129 timestream_client.write_records(
130 DatabaseName=TIMESTREAM_DB,
131 TableName=TIMESTREAM_TABLE,
132 Records=records,
133 )
134 except Exception as e:
135 print(f"Timestream write error: {e}")
2. ML Prediction Layer
The prediction layer integrates multiple ML models for robust price forecasting:
1// lib/constructs/ml-prediction-construct.ts
2import * as cdk from 'aws-cdk-lib';
3import * as lambda from 'aws-cdk-lib/aws-lambda';
4import * as iam from 'aws-cdk-lib/aws-iam';
5import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';
6import { Construct } from 'constructs';
7
8export interface MLPredictionProps {
9 priceTableName: string;
10 enabledModels: string[]; // ['bedrock', 'huggingface', 'custom']
11}
12
13export class MLPredictionConstruct extends Construct {
14 public readonly predictorFunction: lambda.Function;
15
16 constructor(scope: Construct, id: string, props: MLPredictionProps) {
17 super(scope, id);
18
19 // Lambda Layer for ML libraries
20 const mlLayer = new lambda.LayerVersion(this, 'MLLibrariesLayer', {
21 code: lambda.Code.fromAsset('lib/lambda/layers/ml-libs'),
22 compatibleRuntimes: [lambda.Runtime.PYTHON_3_11],
23 description: 'ML libraries: numpy, pandas, scikit-learn',
24 });
25
26 // Prediction Lambda
27 this.predictorFunction = new lambda.Function(this, 'MLPredictor', {
28 runtime: lambda.Runtime.PYTHON_3_11,
29 handler: 'index.handler',
30 code: lambda.Code.fromAsset('lib/lambda/ml-predictor'),
31 timeout: cdk.Duration.seconds(300),
32 memorySize: 3008, // Max memory for ML inference
33 layers: [mlLayer],
34 environment: {
35 PRICE_TABLE: props.priceTableName,
36 ENABLED_MODELS: JSON.stringify(props.enabledModels),
37 BEDROCK_MODEL_ID: 'anthropic.claude-3-sonnet-20240229-v1:0',
38 },
39 });
40
41 // Grant Bedrock access
42 if (props.enabledModels.includes('bedrock')) {
43 this.predictorFunction.addToRolePolicy(new iam.PolicyStatement({
44 actions: [
45 'bedrock:InvokeModel',
46 'bedrock:InvokeModelWithResponseStream',
47 ],
48 resources: ['*'],
49 }));
50 }
51
52 // Grant SageMaker access for HuggingFace models
53 if (props.enabledModels.includes('huggingface')) {
54 this.predictorFunction.addToRolePolicy(new iam.PolicyStatement({
55 actions: [
56 'sagemaker:InvokeEndpoint',
57 ],
58 resources: ['*'],
59 }));
60 }
61
62 // Grant DynamoDB read access
63 const priceTable = dynamodb.Table.fromTableName(this, 'PriceTable', props.priceTableName);
64 priceTable.grantReadData(this.predictorFunction);
65 }
66}
ML Prediction Lambda Implementation
1# lib/lambda/ml-predictor/index.py
2import json
3import boto3
4import numpy as np
5import pandas as pd
6from datetime import datetime, timedelta
7from typing import Dict, List, Tuple
8
9bedrock_runtime = boto3.client('bedrock-runtime')
10sagemaker_runtime = boto3.client('sagemaker-runtime')
11dynamodb = boto3.resource('dynamodb')
12
13PRICE_TABLE = os.environ['PRICE_TABLE']
14ENABLED_MODELS = json.loads(os.environ['ENABLED_MODELS'])
15BEDROCK_MODEL_ID = os.environ['BEDROCK_MODEL_ID']
16
17def handler(event, context):
18 """Generate price predictions using multiple ML models"""
19
20 # Fetch historical data
21 historical_data = fetch_historical_prices(days=30)
22
23 if len(historical_data) < 100:
24 return {'error': 'Insufficient historical data'}
25
26 # Generate predictions from all enabled models
27 predictions = {}
28
29 if 'bedrock' in ENABLED_MODELS:
30 predictions['bedrock'] = predict_with_bedrock(historical_data)
31
32 if 'huggingface' in ENABLED_MODELS:
33 predictions['huggingface'] = predict_with_huggingface(historical_data)
34
35 if 'custom' in ENABLED_MODELS:
36 predictions['custom'] = predict_with_custom_model(historical_data)
37
38 # Ensemble predictions for robustness
39 final_prediction = ensemble_predictions(predictions)
40
41 return {
42 'statusCode': 200,
43 'body': json.dumps({
44 'prediction': final_prediction,
45 'individual_predictions': predictions,
46 'confidence': calculate_confidence(predictions),
47 'timestamp': int(datetime.utcnow().timestamp()),
48 })
49 }
50
51def fetch_historical_prices(days: int = 30) -> pd.DataFrame:
52 """Fetch historical price data from DynamoDB"""
53 table = dynamodb.Table(PRICE_TABLE)
54
55 start_timestamp = int((datetime.utcnow() - timedelta(days=days)).timestamp())
56
57 response = table.query(
58 KeyConditionExpression='symbol = :symbol AND #ts >= :start_ts',
59 ExpressionAttributeNames={'#ts': 'timestamp'},
60 ExpressionAttributeValues={
61 ':symbol': 'BTC-USD',
62 ':start_ts': start_timestamp,
63 },
64 ScanIndexForward=True, # Ascending order
65 )
66
67 df = pd.DataFrame(response['Items'])
68 df['price'] = df['price'].astype(float)
69 df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
70 df = df.sort_values('timestamp')
71
72 return df
73
74def predict_with_bedrock(data: pd.DataFrame) -> Dict:
75 """Use AWS Bedrock Claude for price prediction"""
76
77 # Prepare context for Claude
78 recent_prices = data.tail(50)['price'].tolist()
79 price_changes = data['price_change_24h'].tail(10).tolist()
80
81 prompt = f"""You are a Bitcoin market analyst. Based on the following data, predict the Bitcoin price movement for the next hour.
82
83Recent prices (last 50 data points): {recent_prices}
84Recent 24h price changes: {price_changes}
85
86Current price: ${recent_prices[-1]:.2f}
87
88Provide your analysis in JSON format:
89{{
90 "predicted_price": <number>,
91 "direction": "<up|down|stable>",
92 "confidence": <0-1>,
93 "reasoning": "<brief explanation>"
94}}"""
95
96 try:
97 response = bedrock_runtime.invoke_model(
98 modelId=BEDROCK_MODEL_ID,
99 body=json.dumps({
100 "anthropic_version": "bedrock-2023-05-31",
101 "max_tokens": 1000,
102 "messages": [{
103 "role": "user",
104 "content": prompt
105 }]
106 })
107 )
108
109 result = json.loads(response['body'].read())
110 prediction_text = result['content'][0]['text']
111
112 # Extract JSON from response
113 prediction = json.loads(prediction_text)
114
115 return {
116 'model': 'bedrock-claude',
117 'predicted_price': prediction['predicted_price'],
118 'direction': prediction['direction'],
119 'confidence': prediction['confidence'],
120 'reasoning': prediction['reasoning'],
121 }
122
123 except Exception as e:
124 print(f"Bedrock prediction error: {e}")
125 return None
126
127def predict_with_huggingface(data: pd.DataFrame) -> Dict:
128 """Use HuggingFace model deployed on SageMaker"""
129
130 # Prepare features for time-series model
131 features = prepare_time_series_features(data)
132
133 try:
134 response = sagemaker_runtime.invoke_endpoint(
135 EndpointName='bitcoin-price-predictor',
136 ContentType='application/json',
137 Body=json.dumps({'instances': features.tolist()})
138 )
139
140 result = json.loads(response['Body'].read())
141
142 return {
143 'model': 'huggingface-timeseries',
144 'predicted_price': result['predictions'][0],
145 'confidence': result['confidence'],
146 }
147
148 except Exception as e:
149 print(f"HuggingFace prediction error: {e}")
150 return None
151
152def predict_with_custom_model(data: pd.DataFrame) -> Dict:
153 """Use custom LSTM/GRU model for price prediction"""
154
155 # Simple moving average + momentum-based prediction
156 prices = data['price'].values
157
158 # Technical indicators
159 sma_20 = np.mean(prices[-20:])
160 sma_50 = np.mean(prices[-50:])
161 momentum = (prices[-1] - prices[-10]) / prices[-10]
162
163 # Simple prediction logic
164 if sma_20 > sma_50 and momentum > 0:
165 direction = 'up'
166 predicted_price = prices[-1] * (1 + momentum * 0.5)
167 elif sma_20 < sma_50 and momentum < 0:
168 direction = 'down'
169 predicted_price = prices[-1] * (1 + momentum * 0.5)
170 else:
171 direction = 'stable'
172 predicted_price = prices[-1]
173
174 return {
175 'model': 'custom-technical-analysis',
176 'predicted_price': float(predicted_price),
177 'direction': direction,
178 'confidence': 0.7,
179 'indicators': {
180 'sma_20': float(sma_20),
181 'sma_50': float(sma_50),
182 'momentum': float(momentum),
183 }
184 }
185
186def ensemble_predictions(predictions: Dict) -> Dict:
187 """Combine multiple predictions using weighted ensemble"""
188
189 valid_predictions = [p for p in predictions.values() if p is not None]
190
191 if not valid_predictions:
192 return None
193
194 # Weight predictions by confidence
195 total_confidence = sum(p['confidence'] for p in valid_predictions)
196 weighted_price = sum(
197 p['predicted_price'] * p['confidence']
198 for p in valid_predictions
199 ) / total_confidence
200
201 # Determine consensus direction
202 directions = [p['direction'] for p in valid_predictions]
203 consensus_direction = max(set(directions), key=directions.count)
204
205 return {
206 'predicted_price': weighted_price,
207 'direction': consensus_direction,
208 'num_models': len(valid_predictions),
209 }
210
211def calculate_confidence(predictions: Dict) -> float:
212 """Calculate overall confidence based on model agreement"""
213
214 valid_predictions = [p for p in predictions.values() if p is not None]
215
216 if not valid_predictions:
217 return 0.0
218
219 # Check direction agreement
220 directions = [p['direction'] for p in valid_predictions]
221 agreement_ratio = directions.count(max(set(directions), key=directions.count)) / len(directions)
222
223 # Average confidence weighted by agreement
224 avg_confidence = np.mean([p['confidence'] for p in valid_predictions])
225
226 return float(avg_confidence * agreement_ratio)
3. Trading Engine with Risk Management
Step Functions orchestrates the trading logic with built-in safety checks:
1// lib/constructs/trading-engine-construct.ts
2import * as cdk from 'aws-cdk-lib';
3import * as lambda from 'aws-cdk-lib/aws-lambda';
4import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
5import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
6import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
7import * as sns from 'aws-cdk-lib/aws-sns';
8import * as events from 'aws-cdk-lib/aws-events';
9import * as targets from 'aws-cdk-lib/aws-events-targets';
10import { Construct } from 'constructs';
11
12export interface TradingEngineProps {
13 predictorFunction: lambda.IFunction;
14 maxTradeAmount: number;
15 riskThreshold: number;
16 enableRealTrading: boolean;
17}
18
19export class TradingEngineConstruct extends Construct {
20 public readonly tradeTable: dynamodb.Table;
21 public readonly alertTopic: sns.Topic;
22
23 constructor(scope: Construct, id: string, props: TradingEngineProps) {
24 super(scope, id);
25
26 // Trade history table
27 this.tradeTable = new dynamodb.Table(this, 'TradeHistory', {
28 partitionKey: { name: 'trade_id', type: dynamodb.AttributeType.STRING },
29 sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
30 billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
31 });
32
33 // SNS topic for trade alerts
34 this.alertTopic = new sns.Topic(this, 'TradeAlerts', {
35 displayName: 'Bitcoin Trading Alerts',
36 });
37
38 // Risk manager Lambda
39 const riskManagerFn = new lambda.Function(this, 'RiskManager', {
40 runtime: lambda.Runtime.PYTHON_3_11,
41 handler: 'index.handler',
42 code: lambda.Code.fromAsset('lib/lambda/risk-manager'),
43 timeout: cdk.Duration.seconds(30),
44 environment: {
45 TRADE_TABLE: this.tradeTable.tableName,
46 MAX_TRADE_AMOUNT: props.maxTradeAmount.toString(),
47 RISK_THRESHOLD: props.riskThreshold.toString(),
48 },
49 });
50
51 this.tradeTable.grantReadData(riskManagerFn);
52
53 // Trade executor Lambda
54 const tradeExecutorFn = new lambda.Function(this, 'TradeExecutor', {
55 runtime: lambda.Runtime.PYTHON_3_11,
56 handler: 'index.handler',
57 code: lambda.Code.fromAsset('lib/lambda/trade-executor'),
58 timeout: cdk.Duration.seconds(60),
59 environment: {
60 TRADE_TABLE: this.tradeTable.tableName,
61 ALERT_TOPIC_ARN: this.alertTopic.topicArn,
62 ENABLE_REAL_TRADING: props.enableRealTrading.toString(),
63 },
64 });
65
66 this.tradeTable.grantWriteData(tradeExecutorFn);
67 this.alertTopic.grantPublish(tradeExecutorFn);
68
69 // Step Functions workflow
70 const getPrediction = new tasks.LambdaInvoke(this, 'GetPrediction', {
71 lambdaFunction: props.predictorFunction,
72 outputPath: '$.Payload',
73 });
74
75 const checkRisk = new tasks.LambdaInvoke(this, 'CheckRisk', {
76 lambdaFunction: riskManagerFn,
77 outputPath: '$.Payload',
78 });
79
80 const executeTrade = new tasks.LambdaInvoke(this, 'ExecuteTrade', {
81 lambdaFunction: tradeExecutorFn,
82 outputPath: '$.Payload',
83 });
84
85 const riskCheckPassed = new sfn.Choice(this, 'RiskCheckPassed')
86 .when(sfn.Condition.booleanEquals('$.risk_approved', true), executeTrade)
87 .otherwise(new sfn.Succeed(this, 'TradeRejected'));
88
89 const definition = getPrediction
90 .next(checkRisk)
91 .next(riskCheckPassed);
92
93 const tradingStateMachine = new sfn.StateMachine(this, 'TradingStateMachine', {
94 definition,
95 timeout: cdk.Duration.minutes(5),
96 });
97
98 // EventBridge rule to trigger trading workflow
99 const tradingRule = new events.Rule(this, 'TradingTrigger', {
100 schedule: events.Schedule.rate(cdk.Duration.minutes(5)),
101 description: 'Trigger trading analysis and execution',
102 });
103
104 tradingRule.addTarget(new targets.SfnStateMachine(tradingStateMachine));
105 }
106}
Trade Execution with Safety Checks
1# lib/lambda/trade-executor/index.py
2import json
3import boto3
4import requests
5from datetime import datetime
6from decimal import Decimal
7
8dynamodb = boto3.resource('dynamodb')
9sns_client = boto3.client('sns')
10
11TRADE_TABLE = os.environ['TRADE_TABLE']
12ALERT_TOPIC_ARN = os.environ['ALERT_TOPIC_ARN']
13ENABLE_REAL_TRADING = os.environ['ENABLE_REAL_TRADING'].lower() == 'true'
14
15def handler(event, context):
16 """Execute trade based on prediction and risk assessment"""
17
18 prediction = event['prediction']
19 risk_analysis = event['risk_analysis']
20
21 if not risk_analysis['approved']:
22 return {
23 'trade_executed': False,
24 'reason': risk_analysis['reason']
25 }
26
27 # Determine trade action
28 current_price = prediction['current_price']
29 predicted_price = prediction['predicted_price']
30 confidence = prediction['confidence']
31
32 # Trading logic
33 price_diff_pct = (predicted_price - current_price) / current_price * 100
34
35 if price_diff_pct > 2 and confidence > 0.75:
36 action = 'BUY'
37 amount = calculate_position_size(risk_analysis, predicted_price)
38 elif price_diff_pct < -2 and confidence > 0.75:
39 action = 'SELL'
40 amount = calculate_position_size(risk_analysis, predicted_price)
41 else:
42 action = 'HOLD'
43 amount = 0
44
45 if action == 'HOLD':
46 return {
47 'trade_executed': False,
48 'reason': 'No significant trading signal'
49 }
50
51 # Execute trade
52 if ENABLE_REAL_TRADING:
53 trade_result = execute_real_trade(action, amount, current_price)
54 else:
55 trade_result = simulate_trade(action, amount, current_price)
56
57 # Log trade
58 log_trade(action, amount, current_price, predicted_price, trade_result)
59
60 # Send alert
61 send_trade_alert(action, amount, current_price, trade_result)
62
63 return {
64 'trade_executed': True,
65 'action': action,
66 'amount': amount,
67 'price': current_price,
68 'trade_id': trade_result['trade_id']
69 }
70
71def execute_real_trade(action, amount, price):
72 """Execute trade on exchange (e.g., Binance, Coinbase)"""
73 # This would integrate with actual exchange APIs
74 # Example: Binance API
75 pass
76
77def simulate_trade(action, amount, price):
78 """Simulate trade for testing"""
79 import uuid
80 return {
81 'trade_id': str(uuid.uuid4()),
82 'simulated': True,
83 'status': 'filled'
84 }
85
86def log_trade(action, amount, price, predicted_price, result):
87 """Log trade to DynamoDB"""
88 table = dynamodb.Table(TRADE_TABLE)
89
90 timestamp = int(datetime.utcnow().timestamp())
91
92 table.put_item(Item={
93 'trade_id': result['trade_id'],
94 'timestamp': timestamp,
95 'action': action,
96 'amount': Decimal(str(amount)),
97 'price': Decimal(str(price)),
98 'predicted_price': Decimal(str(predicted_price)),
99 'simulated': result.get('simulated', False),
100 'status': result['status'],
101 })
102
103def send_trade_alert(action, amount, price, result):
104 """Send SNS notification"""
105 message = f"""
106Bitcoin Trade Executed
107
108Action: {action}
109Amount: ${amount:.2f}
110Price: ${price:.2f}
111Trade ID: {result['trade_id']}
112Status: {result['status']}
113{'[SIMULATED]' if result.get('simulated') else '[REAL]'}
114 """
115
116 sns_client.publish(
117 TopicArn=ALERT_TOPIC_ARN,
118 Subject=f'Bitcoin Trade Alert: {action}',
119 Message=message
120 )
Deployment and Configuration
Deploy the complete trading system:
1# Install dependencies
2npm install
3
4# Deploy to development (simulated trading)
5cdk deploy --all --context env=dev
6
7# Deploy to production (real trading - use with caution!)
8cdk deploy --all --context env=prod \
9 --context enableRealTrading=true \
10 --context maxTradeAmount=10000
Monitoring and Performance Tracking
Track system performance with CloudWatch dashboards:
1// Add monitoring construct
2new MonitoringConstruct(this, 'Monitoring', {
3 predictorFunction,
4 tradeTable,
5 metrics: [
6 'PredictionAccuracy',
7 'TradeSuccessRate',
8 'PortfolioValue',
9 'RiskScore'
10 ]
11});
Security Best Practices
- API Key Management: Store exchange API keys in AWS Secrets Manager
- Encryption: Enable encryption for all data at rest and in transit
- IAM Policies: Follow least-privilege principle for all Lambda functions
- VPC Isolation: Deploy sensitive components in private subnets
- Rate Limiting: Implement throttling to prevent excessive trading
Cost Optimization
- Lambda Memory: Right-size ML Lambda functions (3GB for predictions)
- DynamoDB: Use on-demand billing for unpredictable workloads
- S3 Lifecycle: Archive old price data to Glacier after 30 days
- Reserved Capacity: Consider reserved instances for SageMaker endpoints
Conclusion
This Bitcoin trading system demonstrates how modern cloud architecture can power sophisticated financial applications. The combination of event-driven design, multiple ML models, and comprehensive risk management creates a robust platform for automated trading.
Key Takeaways
- Multi-Model Predictions: Ensemble approach improves accuracy and reduces false signals
- Event-Driven Architecture: Serverless design scales automatically with market activity
- Risk Management: Built-in safety checks prevent catastrophic losses
- Infrastructure as Code: CDK enables reproducible, version-controlled deployments
- Cost-Effective: Pay only for actual trading activity and predictions
The complete implementation provides a foundation for building production-grade trading systems that can be extended with additional strategies, risk models, and exchange integrations.
Disclaimer: This system is for educational purposes. Cryptocurrency trading involves substantial risk. Always test thoroughly and never invest more than you can afford to lose.
