Social media has become a powerful force in stock market movements, with influential posts capable of moving stock prices by significant percentages within minutes. This post explores building a production-ready automated US stock trading system that monitors X.com (Twitter) in real-time, analyzes sentiment using multiple ML models, and executes trades on configured stocks like TSLA, GOOG, NVDA, and others based on social media intelligence.
The Challenge: Trading on Social Sentiment
Building a reliable sentiment-driven trading system presents unique technical and business challenges:
- Real-Time Tweet Streaming: Capturing millions of tweets per hour and filtering relevant content
- Multi-Stock Monitoring: Tracking sentiment for dozens of stocks simultaneously
- Sentiment Analysis at Scale: Processing natural language with ML models in real-time
- Signal Quality: Distinguishing genuine market signals from noise and manipulation
- Influencer Impact: Weighing tweets by author credibility and follower count
- Market Hours: Handling pre-market, regular hours, and after-hours trading windows
- Risk Management: Preventing losses from false signals or coordinated manipulation
- Compliance: Meeting SEC regulations for automated trading systems
Why X.com + AWS for Sentiment Trading?
Before diving into implementation, let’s understand why this technology stack excels for social sentiment trading:
X.com: The Pulse of Market Sentiment
X.com (formerly Twitter) provides unparalleled real-time market sentiment data:
1{
2 "tweet_id": "1234567890",
3 "author": {
4 "username": "elonmusk",
5 "followers": 150000000,
6 "verified": true,
7 "influence_score": 0.98
8 },
9 "text": "$TSLA production numbers exceeded expectations. Exciting times ahead!",
10 "mentions": ["TSLA"],
11 "timestamp": "2026-01-24T09:45:00Z",
12 "engagement": {
13 "likes": 250000,
14 "retweets": 45000,
15 "replies": 8000
16 }
17}
Key advantages:
- Real-time data with sub-second latency
- Influencer insights from market-moving accounts
- Public sentiment aggregation across millions of users
- Early indicators often preceding official news
- Trend detection through hashtags and mentions
AWS Serverless Architecture: Scalable Real-Time Processing
| Traditional Approach | AWS Serverless Approach |
|---|---|
| Self-hosted Kafka clusters | Kinesis Data Streams |
| Custom NLP pipelines | Bedrock + Comprehend |
| Manual scaling | Automatic Lambda scaling |
| Complex infrastructure | Fully managed services |
| High operational cost | Pay-per-use pricing |
Architecture Overview: Real-Time Sentiment Trading Pipeline
Our architecture combines event streaming, natural language processing, and trading execution in a fully serverless design:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ X.com API │ │ Lambda │ │ Kinesis │
│ (Streaming) │ -> │ Tweet Collector │ -> │ Data Stream │
│ Filtered by │ │ │ │ │
│ Stock Symbols │ └─────────────────┘ └─────────────────┘
└─────────────────┘ │
v
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ DynamoDB │ │ Lambda │ │ Lambda │
│ Tweet Cache │ <- │ Stream Processor│ <- │ Sentiment │
│ │ │ │ │ Analyzer │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
v
┌────────────────────────────────────────┐
│ Sentiment Analysis Layer │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────┐ │
│ │ Bedrock │ │HuggingFace│ │ AWS │ │
│ │ Claude │ │FinBERT │ │Compre│ │
│ │ │ │ Model │ │ hend │ │
│ └──────────┘ └──────────┘ └──────┘ │
└────────────────────────────────────────┘
│
v
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ DynamoDB │ │ Step Functions │ │ Lambda │
│Sentiment Score │ -> │ Trading Logic │ -> │ Trade Executor │
│ Aggregator │ │ │ │ (Alpaca API) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
v
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ SNS Topic │ │ CloudWatch │ │ DynamoDB │
│ Trade Alerts │ │ Dashboards │ │ Trade History │
│ │ │ & Metrics │ │ & Portfolio │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Data Flow Breakdown:
- Tweet Collection: X.com streaming API filtered by stock symbols ($TSLA, $GOOG, etc.)
- Stream Processing: Kinesis Data Stream buffers incoming tweets
- Sentiment Analysis: ML models analyze tweet sentiment and extract signals
- Aggregation: Sentiment scores aggregated per stock with time-decay weighting
- Trading Decisions: Step Functions orchestrates buy/sell logic based on sentiment thresholds
- Trade Execution: Lambda executes trades via Alpaca or Interactive Brokers API
- Monitoring: Real-time dashboards track sentiment trends and portfolio performance
- Alerts: SNS notifies stakeholders of significant sentiment shifts and trades
Infrastructure as Code: AWS CDK Implementation
Let’s build this system step-by-step using AWS CDK with TypeScript.
Project Structure
sentiment-trading-cdk/
├── lib/
│ ├── sentiment-trading-stack.ts
│ ├── constructs/
│ │ ├── twitter-ingestion-construct.ts
│ │ ├── sentiment-analysis-construct.ts
│ │ ├── trading-engine-construct.ts
│ │ ├── portfolio-management-construct.ts
│ │ └── monitoring-construct.ts
│ └── lambda/
│ ├── tweet-collector/
│ ├── sentiment-analyzer/
│ ├── signal-aggregator/
│ ├── trade-executor/
│ └── risk-manager/
├── config/
│ ├── stocks-config.json
│ └── trading-rules.json
└── bin/
└── sentiment-trading-app.ts
Configuration Management
Stock-specific configurations with sentiment thresholds and position limits:
1// config/stocks-config.json
2{
3 "watchlist": [
4 {
5 "symbol": "TSLA",
6 "name": "Tesla Inc.",
7 "sentiment_threshold": 0.75,
8 "max_position": 10000,
9 "key_influencers": ["elonmusk", "teslarati", "WholeMarsBlog"],
10 "keywords": ["Tesla", "TSLA", "Model", "FSD", "Cybertruck"],
11 "trading_enabled": true
12 },
13 {
14 "symbol": "GOOG",
15 "name": "Alphabet Inc.",
16 "sentiment_threshold": 0.70,
17 "max_position": 15000,
18 "key_influencers": ["sundarpichai", "Google", "googledevs"],
19 "keywords": ["Google", "GOOG", "Alphabet", "AI", "Bard", "Gemini"],
20 "trading_enabled": true
21 },
22 {
23 "symbol": "NVDA",
24 "name": "NVIDIA Corporation",
25 "sentiment_threshold": 0.72,
26 "max_position": 12000,
27 "key_influencers": ["nvidia", "JensenHuang"],
28 "keywords": ["NVIDIA", "NVDA", "GPU", "AI chip", "CUDA"],
29 "trading_enabled": true
30 },
31 {
32 "symbol": "META",
33 "name": "Meta Platforms Inc.",
34 "sentiment_threshold": 0.68,
35 "max_position": 8000,
36 "key_influencers": ["Meta", "zuck"],
37 "keywords": ["Meta", "Facebook", "Instagram", "Metaverse", "VR"],
38 "trading_enabled": true
39 },
40 {
41 "symbol": "AAPL",
42 "name": "Apple Inc.",
43 "sentiment_threshold": 0.70,
44 "max_position": 20000,
45 "key_influencers": ["Apple", "tim_cook"],
46 "keywords": ["Apple", "AAPL", "iPhone", "Vision Pro", "Mac"],
47 "trading_enabled": true
48 }
49 ],
50 "global_settings": {
51 "sentiment_window_minutes": 30,
52 "min_tweet_volume": 50,
53 "influencer_weight_multiplier": 3.0,
54 "max_daily_trades_per_stock": 5,
55 "portfolio_max_allocation_pct": 20
56 }
57}
1// config/trading-rules.json
2{
3 "dev": {
4 "enable_real_trading": false,
5 "max_total_portfolio": 10000,
6 "sentiment_confidence_threshold": 0.75,
7 "min_sentiment_change": 0.15,
8 "market_hours_only": false
9 },
10 "prod": {
11 "enable_real_trading": true,
12 "max_total_portfolio": 100000,
13 "sentiment_confidence_threshold": 0.82,
14 "min_sentiment_change": 0.20,
15 "market_hours_only": true,
16 "pre_market_enabled": true,
17 "after_hours_enabled": false
18 }
19}
1. Twitter/X.com Ingestion Layer
The ingestion construct handles real-time tweet collection and filtering:
1// lib/constructs/twitter-ingestion-construct.ts
2import * as cdk from 'aws-cdk-lib';
3import * as lambda from 'aws-cdk-lib/aws-lambda';
4import * as kinesis from 'aws-cdk-lib/aws-kinesis';
5import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
6import * as events from 'aws-cdk-lib/aws-events';
7import * as targets from 'aws-cdk-lib/aws-events-targets';
8import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
9import * as iam from 'aws-cdk-lib/aws-iam';
10import { Construct } from 'constructs';
11
12export interface TwitterIngestionProps {
13 stockWatchlist: any[];
14 streamShardCount: number;
15}
16
17export class TwitterIngestionConstruct extends Construct {
18 public readonly tweetStream: kinesis.Stream;
19 public readonly tweetCache: dynamodb.Table;
20
21 constructor(scope: Construct, id: string, props: TwitterIngestionProps) {
22 super(scope, id);
23
24 // Kinesis Data Stream for tweet ingestion
25 this.tweetStream = new kinesis.Stream(this, 'TweetStream', {
26 shardCount: props.streamShardCount,
27 retentionPeriod: cdk.Duration.hours(24),
28 encryption: kinesis.StreamEncryption.MANAGED,
29 });
30
31 // DynamoDB table for tweet deduplication and caching
32 this.tweetCache = new dynamodb.Table(this, 'TweetCache', {
33 partitionKey: { name: 'tweet_id', type: dynamodb.AttributeType.STRING },
34 sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
35 billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
36 timeToLiveAttribute: 'ttl',
37 stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
38 });
39
40 // Add GSI for querying by stock symbol
41 this.tweetCache.addGlobalSecondaryIndex({
42 indexName: 'SymbolIndex',
43 partitionKey: { name: 'stock_symbol', type: dynamodb.AttributeType.STRING },
44 sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
45 projectionType: dynamodb.ProjectionType.ALL,
46 });
47
48 // Secret for Twitter API credentials
49 const twitterApiSecret = secretsmanager.Secret.fromSecretNameV2(
50 this,
51 'TwitterApiSecret',
52 'prod/twitter-api-credentials'
53 );
54
55 // Lambda for Twitter stream collection
56 const tweetCollectorFn = new lambda.Function(this, 'TweetCollector', {
57 runtime: lambda.Runtime.PYTHON_3_11,
58 handler: 'index.handler',
59 code: lambda.Code.fromAsset('lib/lambda/tweet-collector'),
60 timeout: cdk.Duration.minutes(15),
61 memorySize: 1024,
62 environment: {
63 KINESIS_STREAM_NAME: this.tweetStream.streamName,
64 TWEET_CACHE_TABLE: this.tweetCache.tableName,
65 STOCK_WATCHLIST: JSON.stringify(props.stockWatchlist),
66 TWITTER_API_SECRET_NAME: twitterApiSecret.secretName,
67 },
68 reservedConcurrentExecutions: 1, // Single instance for streaming
69 });
70
71 // Grant permissions
72 this.tweetStream.grantWrite(tweetCollectorFn);
73 this.tweetCache.grantWriteData(tweetCollectorFn);
74 twitterApiSecret.grantRead(tweetCollectorFn);
75
76 // Keep-alive rule to maintain Twitter stream connection
77 const keepAliveRule = new events.Rule(this, 'StreamKeepAlive', {
78 schedule: events.Schedule.rate(cdk.Duration.minutes(5)),
79 description: 'Keep Twitter stream connection alive',
80 });
81
82 keepAliveRule.addTarget(new targets.LambdaFunction(tweetCollectorFn));
83
84 // CloudWatch alarms for stream health
85 this.tweetStream.metricGetRecordsSuccess().createAlarm(this, 'StreamHealthAlarm', {
86 threshold: 1,
87 evaluationPeriods: 3,
88 comparisonOperator: cdk.aws_cloudwatch.ComparisonOperator.LESS_THAN_THRESHOLD,
89 treatMissingData: cdk.aws_cloudwatch.TreatMissingData.BREACHING,
90 });
91 }
92}
Tweet Collection Lambda Implementation
1# lib/lambda/tweet-collector/index.py
2import json
3import os
4import boto3
5import tweepy
6from datetime import datetime, timedelta
7from typing import List, Dict
8
9kinesis_client = boto3.client('kinesis')
10dynamodb = boto3.resource('dynamodb')
11secrets_client = boto3.client('secretsmanager')
12
13KINESIS_STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
14TWEET_CACHE_TABLE = os.environ['TWEET_CACHE_TABLE']
15STOCK_WATCHLIST = json.loads(os.environ['STOCK_WATCHLIST'])
16TWITTER_API_SECRET_NAME = os.environ['TWITTER_API_SECRET_NAME']
17
18# Global stream listener
19stream_listener = None
20
21class SentimentStreamListener(tweepy.StreamingClient):
22 """Custom Twitter stream listener for stock mentions"""
23
24 def __init__(self, bearer_token, stock_watchlist):
25 super().__init__(bearer_token, wait_on_rate_limit=True)
26 self.stock_watchlist = stock_watchlist
27 self.tweet_cache_table = dynamodb.Table(TWEET_CACHE_TABLE)
28
29 def on_tweet(self, tweet):
30 """Process incoming tweets"""
31 try:
32 # Extract stock symbols mentioned
33 mentioned_stocks = self.extract_stock_symbols(tweet.text)
34
35 if not mentioned_stocks:
36 return
37
38 # Get author info
39 author_info = self.get_author_info(tweet.author_id)
40
41 # Calculate influence score
42 influence_score = self.calculate_influence_score(
43 author_info,
44 mentioned_stocks
45 )
46
47 # Prepare tweet data
48 tweet_data = {
49 'tweet_id': str(tweet.id),
50 'author_id': str(tweet.author_id),
51 'author_username': author_info['username'],
52 'author_followers': author_info['followers_count'],
53 'author_verified': author_info.get('verified', False),
54 'influence_score': influence_score,
55 'text': tweet.text,
56 'mentioned_stocks': mentioned_stocks,
57 'created_at': tweet.created_at.isoformat(),
58 'timestamp': int(tweet.created_at.timestamp()),
59 'engagement': {
60 'retweet_count': tweet.public_metrics.get('retweet_count', 0),
61 'reply_count': tweet.public_metrics.get('reply_count', 0),
62 'like_count': tweet.public_metrics.get('like_count', 0),
63 'quote_count': tweet.public_metrics.get('quote_count', 0),
64 },
65 }
66
67 # Send to Kinesis for processing
68 self.send_to_kinesis(tweet_data)
69
70 # Cache in DynamoDB for deduplication
71 self.cache_tweet(tweet_data)
72
73 print(f"Processed tweet {tweet.id} mentioning {mentioned_stocks}")
74
75 except Exception as e:
76 print(f"Error processing tweet: {e}")
77
78 def extract_stock_symbols(self, text: str) -> List[str]:
79 """Extract stock symbols from tweet text"""
80 mentioned = []
81
82 for stock in self.stock_watchlist:
83 symbol = stock['symbol']
84 keywords = stock.get('keywords', [])
85
86 # Check for $SYMBOL cashtags
87 if f"${symbol}" in text.upper():
88 mentioned.append(symbol)
89 continue
90
91 # Check for keywords
92 text_upper = text.upper()
93 for keyword in keywords:
94 if keyword.upper() in text_upper:
95 mentioned.append(symbol)
96 break
97
98 return list(set(mentioned)) # Remove duplicates
99
100 def get_author_info(self, author_id: str) -> Dict:
101 """Fetch author information from Twitter API"""
102 # This would be cached in practice
103 try:
104 user = self.get_user(id=author_id, user_fields=['public_metrics', 'verified'])
105 return {
106 'username': user.data.username,
107 'followers_count': user.data.public_metrics['followers_count'],
108 'verified': user.data.verified,
109 }
110 except:
111 return {
112 'username': 'unknown',
113 'followers_count': 0,
114 'verified': False,
115 }
116
117 def calculate_influence_score(self, author_info: Dict, mentioned_stocks: List[str]) -> float:
118 """Calculate tweet influence score based on author and context"""
119
120 score = 0.5 # Base score
121
122 # Follower count influence (logarithmic scale)
123 followers = author_info['followers_count']
124 if followers > 1000000:
125 score += 0.3
126 elif followers > 100000:
127 score += 0.2
128 elif followers > 10000:
129 score += 0.1
130
131 # Verified account bonus
132 if author_info.get('verified'):
133 score += 0.1
134
135 # Key influencer bonus
136 username = author_info['username'].lower()
137 for stock in self.stock_watchlist:
138 if stock['symbol'] in mentioned_stocks:
139 key_influencers = [inf.lower() for inf in stock.get('key_influencers', [])]
140 if username in key_influencers:
141 score += 0.3
142 break
143
144 return min(score, 1.0) # Cap at 1.0
145
146 def send_to_kinesis(self, tweet_data: Dict):
147 """Send tweet data to Kinesis stream"""
148 try:
149 kinesis_client.put_record(
150 StreamName=KINESIS_STREAM_NAME,
151 Data=json.dumps(tweet_data),
152 PartitionKey=tweet_data['tweet_id']
153 )
154 except Exception as e:
155 print(f"Error sending to Kinesis: {e}")
156
157 def cache_tweet(self, tweet_data: Dict):
158 """Cache tweet in DynamoDB for deduplication"""
159 try:
160 # Add TTL (24 hours)
161 ttl = int(datetime.utcnow().timestamp()) + (24 * 60 * 60)
162
163 # Store for each mentioned stock
164 for symbol in tweet_data['mentioned_stocks']:
165 self.tweet_cache_table.put_item(Item={
166 'tweet_id': tweet_data['tweet_id'],
167 'timestamp': tweet_data['timestamp'],
168 'stock_symbol': symbol,
169 'author_username': tweet_data['author_username'],
170 'influence_score': tweet_data['influence_score'],
171 'text': tweet_data['text'],
172 'ttl': ttl,
173 })
174 except Exception as e:
175 print(f"Error caching tweet: {e}")
176
177 def on_errors(self, errors):
178 print(f"Twitter API errors: {errors}")
179
180def handler(event, context):
181 """Lambda handler to maintain Twitter stream connection"""
182 global stream_listener
183
184 # Get Twitter API credentials
185 secret = secrets_client.get_secret_value(SecretId=TWITTER_API_SECRET_NAME)
186 credentials = json.loads(secret['SecretString'])
187 bearer_token = credentials['bearer_token']
188
189 # Initialize stream listener if not exists
190 if stream_listener is None:
191 stream_listener = SentimentStreamListener(bearer_token, STOCK_WATCHLIST)
192
193 # Build filter rules for all stocks
194 rules = []
195 for stock in STOCK_WATCHLIST:
196 symbol = stock['symbol']
197 # Track cashtag and keywords
198 rules.append(f"${symbol}")
199 for keyword in stock.get('keywords', [])[:3]: # Limit keywords
200 rules.append(keyword)
201
202 # Delete existing rules
203 existing_rules = stream_listener.get_rules()
204 if existing_rules.data:
205 rule_ids = [rule.id for rule in existing_rules.data]
206 stream_listener.delete_rules(rule_ids)
207
208 # Add new rules
209 for rule in rules[:25]: # Twitter API limit
210 stream_listener.add_rules(tweepy.StreamRule(rule))
211
212 # Start streaming (non-blocking)
213 try:
214 stream_listener.filter(
215 tweet_fields=['author_id', 'created_at', 'public_metrics', 'text'],
216 user_fields=['username', 'public_metrics', 'verified'],
217 threaded=True
218 )
219 return {
220 'statusCode': 200,
221 'body': 'Twitter stream active'
222 }
223 except Exception as e:
224 print(f"Stream error: {e}")
225 return {
226 'statusCode': 500,
227 'body': f'Stream error: {str(e)}'
228 }
2. Sentiment Analysis Layer
The sentiment analysis construct processes tweets using multiple ML models:
1// lib/constructs/sentiment-analysis-construct.ts
2import * as cdk from 'aws-cdk-lib';
3import * as lambda from 'aws-cdk-lib/aws-lambda';
4import * as kinesis from 'aws-cdk-lib/aws-kinesis';
5import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
6import * as iam from 'aws-cdk-lib/aws-iam';
7import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';
8import { Construct } from 'constructs';
9
10export interface SentimentAnalysisProps {
11 tweetStream: kinesis.IStream;
12 enabledModels: string[]; // ['bedrock', 'huggingface', 'comprehend']
13}
14
15export class SentimentAnalysisConstruct extends Construct {
16 public readonly sentimentTable: dynamodb.Table;
17
18 constructor(scope: Construct, id: string, props: SentimentAnalysisProps) {
19 super(scope, id);
20
21 // DynamoDB table for sentiment scores
22 this.sentimentTable = new dynamodb.Table(this, 'SentimentScores', {
23 partitionKey: { name: 'stock_symbol', type: dynamodb.AttributeType.STRING },
24 sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
25 billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
26 timeToLiveAttribute: 'ttl',
27 stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
28 });
29
30 // GSI for querying recent sentiment
31 this.sentimentTable.addGlobalSecondaryIndex({
32 indexName: 'RecentSentimentIndex',
33 partitionKey: { name: 'stock_symbol', type: dynamodb.AttributeType.STRING },
34 sortKey: { name: 'sentiment_score', type: dynamodb.AttributeType.NUMBER },
35 projectionType: dynamodb.ProjectionType.ALL,
36 });
37
38 // Lambda Layer for NLP libraries
39 const nlpLayer = new lambda.LayerVersion(this, 'NLPLibrariesLayer', {
40 code: lambda.Code.fromAsset('lib/lambda/layers/nlp-libs'),
41 compatibleRuntimes: [lambda.Runtime.PYTHON_3_11],
42 description: 'NLP libraries: transformers, torch, nltk',
43 });
44
45 // Sentiment Analysis Lambda
46 const sentimentAnalyzerFn = new lambda.Function(this, 'SentimentAnalyzer', {
47 runtime: lambda.Runtime.PYTHON_3_11,
48 handler: 'index.handler',
49 code: lambda.Code.fromAsset('lib/lambda/sentiment-analyzer'),
50 timeout: cdk.Duration.seconds(300),
51 memorySize: 3008, // Max memory for NLP models
52 layers: [nlpLayer],
53 environment: {
54 SENTIMENT_TABLE: this.sentimentTable.tableName,
55 ENABLED_MODELS: JSON.stringify(props.enabledModels),
56 BEDROCK_MODEL_ID: 'anthropic.claude-3-sonnet-20240229-v1:0',
57 },
58 reservedConcurrentExecutions: 10, // Limit concurrent executions
59 });
60
61 // Grant permissions
62 this.sentimentTable.grantWriteData(sentimentAnalyzerFn);
63
64 // Bedrock access
65 if (props.enabledModels.includes('bedrock')) {
66 sentimentAnalyzerFn.addToRolePolicy(new iam.PolicyStatement({
67 actions: ['bedrock:InvokeModel'],
68 resources: ['*'],
69 }));
70 }
71
72 // Comprehend access
73 if (props.enabledModels.includes('comprehend')) {
74 sentimentAnalyzerFn.addToRolePolicy(new iam.PolicyStatement({
75 actions: [
76 'comprehend:DetectSentiment',
77 'comprehend:DetectEntities',
78 ],
79 resources: ['*'],
80 }));
81 }
82
83 // Connect to Kinesis stream
84 sentimentAnalyzerFn.addEventSource(
85 new lambdaEventSources.KinesisEventSource(props.tweetStream, {
86 batchSize: 10,
87 startingPosition: lambda.StartingPosition.LATEST,
88 retryAttempts: 3,
89 parallelizationFactor: 5,
90 })
91 );
92
93 // Signal Aggregator Lambda (DynamoDB Stream trigger)
94 const signalAggregatorFn = new lambda.Function(this, 'SignalAggregator', {
95 runtime: lambda.Runtime.PYTHON_3_11,
96 handler: 'index.handler',
97 code: lambda.Code.fromAsset('lib/lambda/signal-aggregator'),
98 timeout: cdk.Duration.seconds(60),
99 memorySize: 1024,
100 environment: {
101 SENTIMENT_TABLE: this.sentimentTable.tableName,
102 },
103 });
104
105 this.sentimentTable.grantReadWriteData(signalAggregatorFn);
106
107 // Trigger on sentiment updates
108 signalAggregatorFn.addEventSource(
109 new lambdaEventSources.DynamoEventSource(this.sentimentTable, {
110 startingPosition: lambda.StartingPosition.LATEST,
111 batchSize: 100,
112 retryAttempts: 2,
113 })
114 );
115 }
116}
Sentiment Analysis Lambda Implementation
1# lib/lambda/sentiment-analyzer/index.py
2import json
3import os
4import boto3
5import base64
6from datetime import datetime
7from typing import Dict, List, Tuple
8from decimal import Decimal
9
10bedrock_runtime = boto3.client('bedrock-runtime')
11comprehend_client = boto3.client('comprehend')
12dynamodb = boto3.resource('dynamodb')
13
14SENTIMENT_TABLE = os.environ['SENTIMENT_TABLE']
15ENABLED_MODELS = json.loads(os.environ['ENABLED_MODELS'])
16BEDROCK_MODEL_ID = os.environ['BEDROCK_MODEL_ID']
17
18def handler(event, context):
19 """Analyze sentiment from Kinesis stream records"""
20
21 processed = 0
22 failed = 0
23
24 for record in event['Records']:
25 try:
26 # Decode tweet data
27 payload = base64.b64decode(record['kinesis']['data'])
28 tweet_data = json.loads(payload)
29
30 # Analyze sentiment using multiple models
31 sentiment_results = analyze_tweet_sentiment(tweet_data)
32
33 # Store sentiment scores
34 store_sentiment_scores(tweet_data, sentiment_results)
35
36 processed += 1
37
38 except Exception as e:
39 print(f"Error processing record: {e}")
40 failed += 1
41
42 return {
43 'statusCode': 200,
44 'processed': processed,
45 'failed': failed
46 }
47
48def analyze_tweet_sentiment(tweet_data: Dict) -> Dict:
49 """Analyze sentiment using multiple models"""
50
51 text = tweet_data['text']
52 results = {}
53
54 # AWS Comprehend
55 if 'comprehend' in ENABLED_MODELS:
56 results['comprehend'] = analyze_with_comprehend(text)
57
58 # AWS Bedrock (Claude)
59 if 'bedrock' in ENABLED_MODELS:
60 results['bedrock'] = analyze_with_bedrock(text, tweet_data)
61
62 # HuggingFace FinBERT (if deployed)
63 if 'huggingface' in ENABLED_MODELS:
64 results['huggingface'] = analyze_with_finbert(text)
65
66 # Ensemble the results
67 ensemble_sentiment = ensemble_sentiment_scores(results)
68
69 return {
70 'individual': results,
71 'ensemble': ensemble_sentiment,
72 'confidence': calculate_confidence(results)
73 }
74
75def analyze_with_comprehend(text: str) -> Dict:
76 """Analyze sentiment using AWS Comprehend"""
77 try:
78 response = comprehend_client.detect_sentiment(
79 Text=text,
80 LanguageCode='en'
81 )
82
83 sentiment = response['Sentiment']
84 scores = response['SentimentScore']
85
86 # Convert to normalized score (-1 to 1)
87 if sentiment == 'POSITIVE':
88 score = scores['Positive']
89 elif sentiment == 'NEGATIVE':
90 score = -scores['Negative']
91 elif sentiment == 'NEUTRAL':
92 score = 0
93 else: # MIXED
94 score = scores['Positive'] - scores['Negative']
95
96 return {
97 'model': 'comprehend',
98 'sentiment': sentiment,
99 'score': score,
100 'confidence': max(scores.values()),
101 }
102
103 except Exception as e:
104 print(f"Comprehend error: {e}")
105 return None
106
107def analyze_with_bedrock(text: str, tweet_data: Dict) -> Dict:
108 """Analyze sentiment using AWS Bedrock Claude"""
109
110 author = tweet_data['author_username']
111 influence = tweet_data['influence_score']
112 stocks = tweet_data['mentioned_stocks']
113
114 prompt = f"""You are a financial sentiment analyst. Analyze the following tweet about stocks {', '.join(stocks)}.
115
116Tweet: "{text}"
117Author: @{author} (Influence score: {influence:.2f})
118
119Provide sentiment analysis in JSON format:
120{{
121 "sentiment": "<positive|negative|neutral>",
122 "score": <-1.0 to 1.0>,
123 "confidence": <0.0 to 1.0>,
124 "market_impact": "<high|medium|low>",
125 "reasoning": "<brief explanation>",
126 "key_factors": ["<factor1>", "<factor2>"]
127}}
128
129Consider:
1301. Financial context and terminology
1312. Author's influence and credibility
1323. Specific stock mentions and context
1334. Market timing and relevance"""
134
135 try:
136 response = bedrock_runtime.invoke_model(
137 modelId=BEDROCK_MODEL_ID,
138 body=json.dumps({
139 "anthropic_version": "bedrock-2023-05-31",
140 "max_tokens": 500,
141 "messages": [{
142 "role": "user",
143 "content": prompt
144 }]
145 })
146 )
147
148 result = json.loads(response['body'].read())
149 analysis = json.loads(result['content'][0]['text'])
150
151 return {
152 'model': 'bedrock-claude',
153 'sentiment': analysis['sentiment'],
154 'score': analysis['score'],
155 'confidence': analysis['confidence'],
156 'market_impact': analysis['market_impact'],
157 'reasoning': analysis['reasoning'],
158 'key_factors': analysis['key_factors'],
159 }
160
161 except Exception as e:
162 print(f"Bedrock error: {e}")
163 return None
164
165def analyze_with_finbert(text: str) -> Dict:
166 """Analyze sentiment using FinBERT model"""
167 # This would call a SageMaker endpoint with FinBERT
168 # Placeholder implementation
169 try:
170 # In production, this would invoke SageMaker endpoint
171 # For now, return mock data
172 return {
173 'model': 'finbert',
174 'sentiment': 'positive',
175 'score': 0.65,
176 'confidence': 0.75,
177 }
178 except Exception as e:
179 print(f"FinBERT error: {e}")
180 return None
181
182def ensemble_sentiment_scores(results: Dict) -> Dict:
183 """Ensemble multiple sentiment scores with weighting"""
184
185 valid_results = [r for r in results.values() if r is not None]
186
187 if not valid_results:
188 return None
189
190 # Weight models differently
191 model_weights = {
192 'bedrock-claude': 0.4,
193 'finbert': 0.35,
194 'comprehend': 0.25,
195 }
196
197 total_weight = 0
198 weighted_score = 0
199
200 for result in valid_results:
201 model = result['model']
202 weight = model_weights.get(model, 0.33)
203 weighted_score += result['score'] * weight * result['confidence']
204 total_weight += weight * result['confidence']
205
206 final_score = weighted_score / total_weight if total_weight > 0 else 0
207
208 # Determine sentiment label
209 if final_score > 0.2:
210 sentiment = 'POSITIVE'
211 elif final_score < -0.2:
212 sentiment = 'NEGATIVE'
213 else:
214 sentiment = 'NEUTRAL'
215
216 return {
217 'sentiment': sentiment,
218 'score': final_score,
219 'num_models': len(valid_results),
220 }
221
222def calculate_confidence(results: Dict) -> float:
223 """Calculate confidence based on model agreement"""
224
225 valid_results = [r for r in results.values() if r is not None]
226
227 if not valid_results:
228 return 0.0
229
230 # Check sentiment agreement
231 sentiments = [r['sentiment'].upper() for r in valid_results]
232 most_common = max(set(sentiments), key=sentiments.count)
233 agreement_ratio = sentiments.count(most_common) / len(sentiments)
234
235 # Average confidence
236 avg_confidence = sum(r['confidence'] for r in valid_results) / len(valid_results)
237
238 return agreement_ratio * avg_confidence
239
240def store_sentiment_scores(tweet_data: Dict, sentiment_results: Dict):
241 """Store sentiment scores in DynamoDB"""
242
243 table = dynamodb.Table(SENTIMENT_TABLE)
244 timestamp = int(datetime.utcnow().timestamp())
245
246 # Store for each mentioned stock
247 for symbol in tweet_data['mentioned_stocks']:
248 ensemble = sentiment_results['ensemble']
249
250 if ensemble is None:
251 continue
252
253 table.put_item(Item={
254 'stock_symbol': symbol,
255 'timestamp': timestamp,
256 'tweet_id': tweet_data['tweet_id'],
257 'author_username': tweet_data['author_username'],
258 'influence_score': Decimal(str(tweet_data['influence_score'])),
259 'sentiment': ensemble['sentiment'],
260 'sentiment_score': Decimal(str(ensemble['score'])),
261 'confidence': Decimal(str(sentiment_results['confidence'])),
262 'text': tweet_data['text'],
263 'engagement': tweet_data['engagement'],
264 'individual_models': json.dumps(sentiment_results['individual'], default=str),
265 'ttl': timestamp + (24 * 60 * 60), # 24 hour retention
266 })
Signal Aggregator Lambda
1# lib/lambda/signal-aggregator/index.py
2import json
3import os
4import boto3
5from datetime import datetime, timedelta
6from decimal import Decimal
7from typing import Dict, List
8from collections import defaultdict
9
10dynamodb = boto3.resource('dynamodb')
11eventbridge_client = boto3.client('events')
12
13SENTIMENT_TABLE = os.environ['SENTIMENT_TABLE']
14
15def handler(event, context):
16 """Aggregate sentiment signals and detect trading opportunities"""
17
18 # Group sentiment updates by stock
19 stock_updates = defaultdict(list)
20
21 for record in event['Records']:
22 if record['eventName'] in ['INSERT', 'MODIFY']:
23 new_image = record['dynamodb']['NewImage']
24 stock_symbol = new_image['stock_symbol']['S']
25 stock_updates[stock_symbol].append(new_image)
26
27 # Analyze each stock
28 trading_signals = []
29
30 for symbol, updates in stock_updates.items():
31 signal = analyze_stock_sentiment(symbol)
32 if signal:
33 trading_signals.append(signal)
34
35 # Emit trading signals to EventBridge
36 for signal in trading_signals:
37 emit_trading_signal(signal)
38
39 return {
40 'statusCode': 200,
41 'signals_generated': len(trading_signals)
42 }
43
44def analyze_stock_sentiment(symbol: str) -> Dict:
45 """Analyze aggregated sentiment for a stock"""
46
47 table = dynamodb.Table(SENTIMENT_TABLE)
48
49 # Query recent sentiment (last 30 minutes)
50 start_time = int((datetime.utcnow() - timedelta(minutes=30)).timestamp())
51
52 response = table.query(
53 KeyConditionExpression='stock_symbol = :symbol AND #ts >= :start_time',
54 ExpressionAttributeNames={'#ts': 'timestamp'},
55 ExpressionAttributeValues={
56 ':symbol': symbol,
57 ':start_time': start_time,
58 }
59 )
60
61 items = response['Items']
62
63 if len(items) < 10: # Minimum tweet volume
64 return None
65
66 # Calculate weighted sentiment score
67 total_score = 0
68 total_weight = 0
69
70 for item in items:
71 score = float(item['sentiment_score'])
72 influence = float(item['influence_score'])
73 confidence = float(item['confidence'])
74
75 # Time decay: recent tweets have more weight
76 age_minutes = (datetime.utcnow().timestamp() - item['timestamp']) / 60
77 time_weight = max(0.1, 1 - (age_minutes / 30))
78
79 weight = influence * confidence * time_weight
80 total_score += score * weight
81 total_weight += weight
82
83 if total_weight == 0:
84 return None
85
86 avg_sentiment = total_score / total_weight
87
88 # Calculate sentiment change (compare to 1 hour ago)
89 previous_sentiment = get_previous_sentiment(symbol, hours=1)
90 sentiment_change = avg_sentiment - previous_sentiment if previous_sentiment else 0
91
92 # Determine if this is a trading signal
93 if abs(sentiment_change) < 0.15: # Minimum change threshold
94 return None
95
96 return {
97 'stock_symbol': symbol,
98 'current_sentiment': avg_sentiment,
99 'previous_sentiment': previous_sentiment,
100 'sentiment_change': sentiment_change,
101 'direction': 'BUY' if sentiment_change > 0 else 'SELL',
102 'tweet_volume': len(items),
103 'confidence': total_weight / len(items),
104 'timestamp': int(datetime.utcnow().timestamp()),
105 }
106
107def get_previous_sentiment(symbol: str, hours: int = 1) -> float:
108 """Get average sentiment from previous time period"""
109
110 table = dynamodb.Table(SENTIMENT_TABLE)
111
112 end_time = int((datetime.utcnow() - timedelta(hours=hours)).timestamp())
113 start_time = end_time - (30 * 60) # 30 minutes window
114
115 response = table.query(
116 KeyConditionExpression='stock_symbol = :symbol AND #ts BETWEEN :start AND :end',
117 ExpressionAttributeNames={'#ts': 'timestamp'},
118 ExpressionAttributeValues={
119 ':symbol': symbol,
120 ':start': start_time,
121 ':end': end_time,
122 }
123 )
124
125 items = response['Items']
126
127 if not items:
128 return 0.0
129
130 avg = sum(float(item['sentiment_score']) for item in items) / len(items)
131 return avg
132
133def emit_trading_signal(signal: Dict):
134 """Emit trading signal to EventBridge"""
135
136 try:
137 eventbridge_client.put_events(
138 Entries=[{
139 'Source': 'sentiment.trading',
140 'DetailType': 'TradingSignal',
141 'Detail': json.dumps(signal, default=str),
142 'EventBusName': 'default',
143 }]
144 )
145 print(f"Emitted trading signal for {signal['stock_symbol']}: {signal['direction']}")
146 except Exception as e:
147 print(f"Error emitting signal: {e}")
3. Trading Engine with Portfolio Management
1// lib/constructs/trading-engine-construct.ts
2import * as cdk from 'aws-cdk-lib';
3import * as lambda from 'aws-cdk-lib/aws-lambda';
4import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
5import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
6import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
7import * as events from 'aws-cdk-lib/aws-events';
8import * as targets from 'aws-cdk-lib/aws-events-targets';
9import * as sns from 'aws-cdk-lib/aws-sns';
10import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
11import { Construct } from 'constructs';
12
13export interface TradingEngineProps {
14 tradingRules: any;
15 stockWatchlist: any[];
16}
17
18export class TradingEngineConstruct extends Construct {
19 public readonly portfolioTable: dynamodb.Table;
20 public readonly tradeHistoryTable: dynamodb.Table;
21 public readonly alertTopic: sns.Topic;
22
23 constructor(scope: Construct, id: string, props: TradingEngineProps) {
24 super(scope, id);
25
26 // Portfolio state table
27 this.portfolioTable = new dynamodb.Table(this, 'Portfolio', {
28 partitionKey: { name: 'stock_symbol', type: dynamodb.AttributeType.STRING },
29 billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
30 });
31
32 // Trade history table
33 this.tradeHistoryTable = new dynamodb.Table(this, 'TradeHistory', {
34 partitionKey: { name: 'trade_id', type: dynamodb.AttributeType.STRING },
35 sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
36 billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
37 });
38
39 // Add GSI for querying by stock
40 this.tradeHistoryTable.addGlobalSecondaryIndex({
41 indexName: 'StockTradesIndex',
42 partitionKey: { name: 'stock_symbol', type: dynamodb.AttributeType.STRING },
43 sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
44 });
45
46 // SNS topic for alerts
47 this.alertTopic = new sns.Topic(this, 'TradingAlerts', {
48 displayName: 'Sentiment Trading Alerts',
49 });
50
51 // Alpaca API credentials secret
52 const alpacaApiSecret = secretsmanager.Secret.fromSecretNameV2(
53 this,
54 'AlpacaApiSecret',
55 'prod/alpaca-api-credentials'
56 );
57
58 // Risk Manager Lambda
59 const riskManagerFn = new lambda.Function(this, 'RiskManager', {
60 runtime: lambda.Runtime.PYTHON_3_11,
61 handler: 'index.handler',
62 code: lambda.Code.fromAsset('lib/lambda/risk-manager'),
63 timeout: cdk.Duration.seconds(30),
64 environment: {
65 PORTFOLIO_TABLE: this.portfolioTable.tableName,
66 TRADE_HISTORY_TABLE: this.tradeHistoryTable.tableName,
67 MAX_PORTFOLIO_VALUE: props.tradingRules.max_total_portfolio.toString(),
68 MAX_DAILY_TRADES: props.tradingRules.max_daily_trades_per_stock?.toString() || '5',
69 STOCK_WATCHLIST: JSON.stringify(props.stockWatchlist),
70 },
71 });
72
73 this.portfolioTable.grantReadData(riskManagerFn);
74 this.tradeHistoryTable.grantReadData(riskManagerFn);
75
76 // Trade Executor Lambda
77 const tradeExecutorFn = new lambda.Function(this, 'TradeExecutor', {
78 runtime: lambda.Runtime.PYTHON_3_11,
79 handler: 'index.handler',
80 code: lambda.Code.fromAsset('lib/lambda/trade-executor'),
81 timeout: cdk.Duration.seconds(60),
82 environment: {
83 PORTFOLIO_TABLE: this.portfolioTable.tableName,
84 TRADE_HISTORY_TABLE: this.tradeHistoryTable.tableName,
85 ALERT_TOPIC_ARN: this.alertTopic.topicArn,
86 ENABLE_REAL_TRADING: props.tradingRules.enable_real_trading.toString(),
87 ALPACA_API_SECRET_NAME: alpacaApiSecret.secretName,
88 MARKET_HOURS_ONLY: props.tradingRules.market_hours_only?.toString() || 'true',
89 },
90 });
91
92 this.portfolioTable.grantReadWriteData(tradeExecutorFn);
93 this.tradeHistoryTable.grantWriteData(tradeExecutorFn);
94 this.alertTopic.grantPublish(tradeExecutorFn);
95 alpacaApiSecret.grantRead(tradeExecutorFn);
96
97 // Step Functions workflow
98 const checkRisk = new tasks.LambdaInvoke(this, 'CheckRisk', {
99 lambdaFunction: riskManagerFn,
100 outputPath: '$.Payload',
101 });
102
103 const executeTrade = new tasks.LambdaInvoke(this, 'ExecuteTrade', {
104 lambdaFunction: tradeExecutorFn,
105 outputPath: '$.Payload',
106 });
107
108 const riskApproved = new sfn.Choice(this, 'RiskApproved')
109 .when(
110 sfn.Condition.booleanEquals('$.risk_approved', true),
111 executeTrade
112 )
113 .otherwise(new sfn.Succeed(this, 'TradeRejected'));
114
115 const definition = checkRisk.next(riskApproved);
116
117 const tradingStateMachine = new sfn.StateMachine(this, 'TradingWorkflow', {
118 definition,
119 timeout: cdk.Duration.minutes(5),
120 });
121
122 // EventBridge rule to trigger on trading signals
123 new events.Rule(this, 'TradingSignalRule', {
124 eventPattern: {
125 source: ['sentiment.trading'],
126 detailType: ['TradingSignal'],
127 },
128 targets: [new targets.SfnStateMachine(tradingStateMachine)],
129 });
130 }
131}
Trade Executor Implementation
1# lib/lambda/trade-executor/index.py
2import json
3import os
4import boto3
5from datetime import datetime
6import alpaca_trade_api as tradeapi
7from decimal import Decimal
8
9dynamodb = boto3.resource('dynamodb')
10sns_client = boto3.client('sns')
11secrets_client = boto3.client('secretsmanager')
12
13PORTFOLIO_TABLE = os.environ['PORTFOLIO_TABLE']
14TRADE_HISTORY_TABLE = os.environ['TRADE_HISTORY_TABLE']
15ALERT_TOPIC_ARN = os.environ['ALERT_TOPIC_ARN']
16ENABLE_REAL_TRADING = os.environ['ENABLE_REAL_TRADING'].lower() == 'true'
17ALPACA_API_SECRET_NAME = os.environ['ALPACA_API_SECRET_NAME']
18MARKET_HOURS_ONLY = os.environ.get('MARKET_HOURS_ONLY', 'true').lower() == 'true'
19
20# Initialize Alpaca API client
21alpaca_api = None
22
23def get_alpaca_client():
24 """Initialize Alpaca API client"""
25 global alpaca_api
26
27 if alpaca_api is None:
28 secret = secrets_client.get_secret_value(SecretId=ALPACA_API_SECRET_NAME)
29 credentials = json.loads(secret['SecretString'])
30
31 alpaca_api = tradeapi.REST(
32 credentials['api_key'],
33 credentials['api_secret'],
34 credentials['base_url'], # Paper or live trading URL
35 api_version='v2'
36 )
37
38 return alpaca_api
39
40def handler(event, context):
41 """Execute trade based on sentiment signal and risk approval"""
42
43 signal = event.get('detail', event)
44
45 if not event.get('risk_approved', False):
46 return {
47 'trade_executed': False,
48 'reason': event.get('risk_reason', 'Risk check failed')
49 }
50
51 stock_symbol = signal['stock_symbol']
52 direction = signal['direction']
53 sentiment_change = signal['sentiment_change']
54 confidence = signal['confidence']
55
56 # Check market hours
57 if MARKET_HOURS_ONLY and not is_market_open():
58 return {
59 'trade_executed': False,
60 'reason': 'Market is closed'
61 }
62
63 # Get current stock price
64 current_price = get_current_price(stock_symbol)
65
66 if not current_price:
67 return {
68 'trade_executed': False,
69 'reason': 'Unable to fetch current price'
70 }
71
72 # Calculate position size based on sentiment strength
73 position_size = calculate_position_size(
74 stock_symbol,
75 sentiment_change,
76 confidence,
77 current_price
78 )
79
80 if position_size == 0:
81 return {
82 'trade_executed': False,
83 'reason': 'Position size too small'
84 }
85
86 # Execute trade
87 if ENABLE_REAL_TRADING:
88 trade_result = execute_alpaca_trade(
89 stock_symbol,
90 direction,
91 position_size,
92 current_price
93 )
94 else:
95 trade_result = simulate_trade(
96 stock_symbol,
97 direction,
98 position_size,
99 current_price
100 )
101
102 # Update portfolio
103 update_portfolio(stock_symbol, direction, position_size, current_price)
104
105 # Log trade
106 log_trade(signal, trade_result, position_size, current_price)
107
108 # Send alert
109 send_trade_alert(stock_symbol, direction, position_size, current_price, trade_result)
110
111 return {
112 'trade_executed': True,
113 'stock_symbol': stock_symbol,
114 'direction': direction,
115 'quantity': position_size,
116 'price': current_price,
117 'trade_id': trade_result['trade_id'],
118 }
119
120def is_market_open() -> bool:
121 """Check if market is currently open"""
122 try:
123 api = get_alpaca_client()
124 clock = api.get_clock()
125 return clock.is_open
126 except:
127 # Fallback to simple time check
128 now = datetime.now()
129 return now.weekday() < 5 and 9 <= now.hour < 16
130
131def get_current_price(symbol: str) -> float:
132 """Get current stock price from Alpaca"""
133 try:
134 api = get_alpaca_client()
135 quote = api.get_latest_trade(symbol)
136 return float(quote.price)
137 except Exception as e:
138 print(f"Error fetching price for {symbol}: {e}")
139 return None
140
141def calculate_position_size(
142 symbol: str,
143 sentiment_change: float,
144 confidence: float,
145 price: float
146) -> int:
147 """Calculate number of shares to trade"""
148
149 # Get current portfolio
150 portfolio_table = dynamodb.Table(PORTFOLIO_TABLE)
151
152 try:
153 response = portfolio_table.get_item(Key={'stock_symbol': symbol})
154 current_position = int(response.get('Item', {}).get('quantity', 0))
155 except:
156 current_position = 0
157
158 # Calculate base position size (as percentage of max allocation)
159 signal_strength = abs(sentiment_change) * confidence
160 max_position_value = 10000 # From config
161
162 target_value = max_position_value * signal_strength
163 target_shares = int(target_value / price)
164
165 # Limit to reasonable position changes
166 max_change = max(10, int(target_shares * 0.3)) # Max 30% change
167 position_change = min(max_change, target_shares)
168
169 return position_change
170
171def execute_alpaca_trade(
172 symbol: str,
173 direction: str,
174 quantity: int,
175 price: float
176) -> dict:
177 """Execute real trade via Alpaca API"""
178
179 try:
180 api = get_alpaca_client()
181
182 side = 'buy' if direction == 'BUY' else 'sell'
183
184 order = api.submit_order(
185 symbol=symbol,
186 qty=quantity,
187 side=side,
188 type='market',
189 time_in_force='day'
190 )
191
192 return {
193 'trade_id': order.id,
194 'status': order.status,
195 'filled_price': float(order.filled_avg_price) if order.filled_avg_price else price,
196 'simulated': False,
197 }
198
199 except Exception as e:
200 print(f"Alpaca trade error: {e}")
201 return {
202 'trade_id': 'ERROR',
203 'status': 'failed',
204 'error': str(e),
205 'simulated': False,
206 }
207
208def simulate_trade(
209 symbol: str,
210 direction: str,
211 quantity: int,
212 price: float
213) -> dict:
214 """Simulate trade for testing"""
215
216 import uuid
217
218 return {
219 'trade_id': str(uuid.uuid4()),
220 'status': 'filled',
221 'filled_price': price,
222 'simulated': True,
223 }
224
225def update_portfolio(symbol: str, direction: str, quantity: int, price: float):
226 """Update portfolio state in DynamoDB"""
227
228 table = dynamodb.Table(PORTFOLIO_TABLE)
229
230 # Get current position
231 try:
232 response = table.get_item(Key={'stock_symbol': symbol})
233 item = response.get('Item', {})
234 current_qty = int(item.get('quantity', 0))
235 current_value = float(item.get('total_value', 0))
236 except:
237 current_qty = 0
238 current_value = 0
239
240 # Update position
241 if direction == 'BUY':
242 new_qty = current_qty + quantity
243 new_value = current_value + (quantity * price)
244 else: # SELL
245 new_qty = max(0, current_qty - quantity)
246 new_value = max(0, current_value - (quantity * price))
247
248 table.put_item(Item={
249 'stock_symbol': symbol,
250 'quantity': new_qty,
251 'total_value': Decimal(str(new_value)),
252 'avg_price': Decimal(str(new_value / new_qty)) if new_qty > 0 else Decimal('0'),
253 'last_updated': int(datetime.utcnow().timestamp()),
254 })
255
256def log_trade(signal: dict, trade_result: dict, quantity: int, price: float):
257 """Log trade to DynamoDB"""
258
259 table = dynamodb.Table(TRADE_HISTORY_TABLE)
260 timestamp = int(datetime.utcnow().timestamp())
261
262 table.put_item(Item={
263 'trade_id': trade_result['trade_id'],
264 'timestamp': timestamp,
265 'stock_symbol': signal['stock_symbol'],
266 'direction': signal['direction'],
267 'quantity': quantity,
268 'price': Decimal(str(price)),
269 'filled_price': Decimal(str(trade_result.get('filled_price', price))),
270 'sentiment_change': Decimal(str(signal['sentiment_change'])),
271 'confidence': Decimal(str(signal['confidence'])),
272 'status': trade_result['status'],
273 'simulated': trade_result.get('simulated', False),
274 })
275
276def send_trade_alert(symbol: str, direction: str, quantity: int, price: float, result: dict):
277 """Send SNS notification"""
278
279 total_value = quantity * price
280 simulated_tag = '[SIMULATED]' if result.get('simulated') else '[REAL]'
281
282 message = f"""
283Sentiment-Driven Trade Executed {simulated_tag}
284
285Stock: {symbol}
286Direction: {direction}
287Quantity: {quantity} shares
288Price: ${price:.2f}
289Total Value: ${total_value:.2f}
290Trade ID: {result['trade_id']}
291Status: {result['status']}
292 """
293
294 sns_client.publish(
295 TopicArn=ALERT_TOPIC_ARN,
296 Subject=f'Trade Alert: {direction} {symbol}',
297 Message=message
298 )
Deployment
Deploy the complete sentiment trading system:
1# Install dependencies
2npm install
3
4# Deploy to development (simulated trading)
5cdk deploy --all --context env=dev
6
7# Deploy to production (requires Twitter and Alpaca API credentials)
8cdk deploy --all --context env=prod \
9 --context enableRealTrading=true \
10 --context maxPortfolio=100000
Monitoring Dashboard
Create CloudWatch dashboard for real-time monitoring:
1new MonitoringConstruct(this, 'Monitoring', {
2 tweetStream,
3 sentimentTable,
4 portfolioTable,
5 metrics: [
6 'TweetIngestionRate',
7 'SentimentScoreDistribution',
8 'TradingSignalsGenerated',
9 'TradesExecuted',
10 'PortfolioValue',
11 'PortfolioPerformance'
12 ]
13});
Security & Compliance
- API Key Security: Store all credentials in Secrets Manager with rotation
- Data Encryption: Enable encryption at rest for all DynamoDB tables and Kinesis streams
- IAM Least Privilege: Each Lambda has minimal required permissions
- Audit Logging: CloudTrail logs all API calls and trades
- SEC Compliance: Maintain complete trade history for regulatory requirements
Cost Optimization
- Lambda Memory: Right-size sentiment analysis functions (3GB for NLP)
- Kinesis Shards: Start with 2 shards, scale based on tweet volume
- DynamoDB: Use on-demand billing for unpredictable workloads
- Reserved Capacity: Consider RI for consistent Alpaca API usage
Conclusion
This sentiment-driven trading system demonstrates the power of combining social media intelligence with cloud-native architecture. By analyzing X.com in real-time and using multiple ML models for sentiment analysis, the system can identify trading opportunities before traditional news sources.
Key Takeaways
- Real-Time Processing: Kinesis streams process millions of tweets with sub-second latency
- Multi-Model Sentiment: Ensemble approach improves accuracy using Bedrock, FinBERT, and Comprehend
- Risk Management: Multi-layered safety checks prevent catastrophic losses
- Scalable Architecture: Fully serverless design scales automatically with tweet volume
- Production-Ready: Complete monitoring, alerting, and compliance features
Disclaimer: This system is for educational purposes. Stock trading involves substantial risk. Social media sentiment can be manipulated. Always test thoroughly, never invest more than you can afford to lose, and comply with all securities regulations.
