Building a Sentiment-Driven US Stock Trading System with X.com Real-Time Analysis

Social media has become a powerful force in stock market movements, with influential posts capable of moving stock prices by significant percentages within minutes. This post explores building a production-ready automated US stock trading system that monitors X.com (Twitter) in real-time, analyzes sentiment using multiple ML models, and executes trades on configured stocks like TSLA, GOOG, NVDA, and others based on social media intelligence.

The Challenge: Trading on Social Sentiment

Building a reliable sentiment-driven trading system presents unique technical and business challenges:

  • Real-Time Tweet Streaming: Capturing millions of tweets per hour and filtering relevant content
  • Multi-Stock Monitoring: Tracking sentiment for dozens of stocks simultaneously
  • Sentiment Analysis at Scale: Processing natural language with ML models in real-time
  • Signal Quality: Distinguishing genuine market signals from noise and manipulation
  • Influencer Impact: Weighing tweets by author credibility and follower count
  • Market Hours: Handling pre-market, regular hours, and after-hours trading windows
  • Risk Management: Preventing losses from false signals or coordinated manipulation
  • Compliance: Meeting SEC regulations for automated trading systems

Why X.com + AWS for Sentiment Trading?

Before diving into implementation, let’s understand why this technology stack excels for social sentiment trading:

X.com: The Pulse of Market Sentiment

X.com (formerly Twitter) provides unparalleled real-time market sentiment data:

 1{
 2  "tweet_id": "1234567890",
 3  "author": {
 4    "username": "elonmusk",
 5    "followers": 150000000,
 6    "verified": true,
 7    "influence_score": 0.98
 8  },
 9  "text": "$TSLA production numbers exceeded expectations. Exciting times ahead!",
10  "mentions": ["TSLA"],
11  "timestamp": "2026-01-24T09:45:00Z",
12  "engagement": {
13    "likes": 250000,
14    "retweets": 45000,
15    "replies": 8000
16  }
17}

Key advantages:

  • Real-time data with sub-second latency
  • Influencer insights from market-moving accounts
  • Public sentiment aggregation across millions of users
  • Early indicators often preceding official news
  • Trend detection through hashtags and mentions

AWS Serverless Architecture: Scalable Real-Time Processing

Traditional ApproachAWS Serverless Approach
Self-hosted Kafka clustersKinesis Data Streams
Custom NLP pipelinesBedrock + Comprehend
Manual scalingAutomatic Lambda scaling
Complex infrastructureFully managed services
High operational costPay-per-use pricing

Architecture Overview: Real-Time Sentiment Trading Pipeline

Our architecture combines event streaming, natural language processing, and trading execution in a fully serverless design:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   X.com API     │    │    Lambda       │    │    Kinesis      │
│  (Streaming)    │ -> │ Tweet Collector │ -> │  Data Stream    │
│  Filtered by    │    │                 │    │                 │
│  Stock Symbols  │    └─────────────────┘    └─────────────────┘
└─────────────────┘                                    │
                                                       v
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   DynamoDB      │    │     Lambda      │    │    Lambda       │
│  Tweet Cache    │ <- │ Stream Processor│ <- │ Sentiment       │
│                 │    │                 │    │   Analyzer      │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                                       │
                                                       v
                       ┌────────────────────────────────────────┐
                       │      Sentiment Analysis Layer          │
                       │                                        │
                       │  ┌──────────┐  ┌──────────┐  ┌──────┐ │
                       │  │ Bedrock  │  │HuggingFace│  │ AWS  │ │
                       │  │  Claude  │  │FinBERT    │  │Compre│ │
                       │  │          │  │  Model    │  │ hend │ │
                       │  └──────────┘  └──────────┘  └──────┘ │
                       └────────────────────────────────────────┘
                                         │
                                         v
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  DynamoDB       │    │  Step Functions │    │     Lambda      │
│Sentiment Score  │ -> │ Trading Logic   │ -> │  Trade Executor │
│  Aggregator     │    │                 │    │  (Alpaca API)   │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                │
                                v
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   SNS Topic     │    │   CloudWatch    │    │    DynamoDB     │
│ Trade Alerts    │    │   Dashboards    │    │  Trade History  │
│                 │    │   & Metrics     │    │   & Portfolio   │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Data Flow Breakdown:

  1. Tweet Collection: X.com streaming API filtered by stock symbols ($TSLA, $GOOG, etc.)
  2. Stream Processing: Kinesis Data Stream buffers incoming tweets
  3. Sentiment Analysis: ML models analyze tweet sentiment and extract signals
  4. Aggregation: Sentiment scores aggregated per stock with time-decay weighting
  5. Trading Decisions: Step Functions orchestrates buy/sell logic based on sentiment thresholds
  6. Trade Execution: Lambda executes trades via Alpaca or Interactive Brokers API
  7. Monitoring: Real-time dashboards track sentiment trends and portfolio performance
  8. Alerts: SNS notifies stakeholders of significant sentiment shifts and trades

Infrastructure as Code: AWS CDK Implementation

Let’s build this system step-by-step using AWS CDK with TypeScript.

Project Structure

sentiment-trading-cdk/
├── lib/
│   ├── sentiment-trading-stack.ts
│   ├── constructs/
│   │   ├── twitter-ingestion-construct.ts
│   │   ├── sentiment-analysis-construct.ts
│   │   ├── trading-engine-construct.ts
│   │   ├── portfolio-management-construct.ts
│   │   └── monitoring-construct.ts
│   └── lambda/
│       ├── tweet-collector/
│       ├── sentiment-analyzer/
│       ├── signal-aggregator/
│       ├── trade-executor/
│       └── risk-manager/
├── config/
│   ├── stocks-config.json
│   └── trading-rules.json
└── bin/
    └── sentiment-trading-app.ts

Configuration Management

Stock-specific configurations with sentiment thresholds and position limits:

 1// config/stocks-config.json
 2{
 3  "watchlist": [
 4    {
 5      "symbol": "TSLA",
 6      "name": "Tesla Inc.",
 7      "sentiment_threshold": 0.75,
 8      "max_position": 10000,
 9      "key_influencers": ["elonmusk", "teslarati", "WholeMarsBlog"],
10      "keywords": ["Tesla", "TSLA", "Model", "FSD", "Cybertruck"],
11      "trading_enabled": true
12    },
13    {
14      "symbol": "GOOG",
15      "name": "Alphabet Inc.",
16      "sentiment_threshold": 0.70,
17      "max_position": 15000,
18      "key_influencers": ["sundarpichai", "Google", "googledevs"],
19      "keywords": ["Google", "GOOG", "Alphabet", "AI", "Bard", "Gemini"],
20      "trading_enabled": true
21    },
22    {
23      "symbol": "NVDA",
24      "name": "NVIDIA Corporation",
25      "sentiment_threshold": 0.72,
26      "max_position": 12000,
27      "key_influencers": ["nvidia", "JensenHuang"],
28      "keywords": ["NVIDIA", "NVDA", "GPU", "AI chip", "CUDA"],
29      "trading_enabled": true
30    },
31    {
32      "symbol": "META",
33      "name": "Meta Platforms Inc.",
34      "sentiment_threshold": 0.68,
35      "max_position": 8000,
36      "key_influencers": ["Meta", "zuck"],
37      "keywords": ["Meta", "Facebook", "Instagram", "Metaverse", "VR"],
38      "trading_enabled": true
39    },
40    {
41      "symbol": "AAPL",
42      "name": "Apple Inc.",
43      "sentiment_threshold": 0.70,
44      "max_position": 20000,
45      "key_influencers": ["Apple", "tim_cook"],
46      "keywords": ["Apple", "AAPL", "iPhone", "Vision Pro", "Mac"],
47      "trading_enabled": true
48    }
49  ],
50  "global_settings": {
51    "sentiment_window_minutes": 30,
52    "min_tweet_volume": 50,
53    "influencer_weight_multiplier": 3.0,
54    "max_daily_trades_per_stock": 5,
55    "portfolio_max_allocation_pct": 20
56  }
57}
 1// config/trading-rules.json
 2{
 3  "dev": {
 4    "enable_real_trading": false,
 5    "max_total_portfolio": 10000,
 6    "sentiment_confidence_threshold": 0.75,
 7    "min_sentiment_change": 0.15,
 8    "market_hours_only": false
 9  },
10  "prod": {
11    "enable_real_trading": true,
12    "max_total_portfolio": 100000,
13    "sentiment_confidence_threshold": 0.82,
14    "min_sentiment_change": 0.20,
15    "market_hours_only": true,
16    "pre_market_enabled": true,
17    "after_hours_enabled": false
18  }
19}

1. Twitter/X.com Ingestion Layer

The ingestion construct handles real-time tweet collection and filtering:

 1// lib/constructs/twitter-ingestion-construct.ts
 2import * as cdk from 'aws-cdk-lib';
 3import * as lambda from 'aws-cdk-lib/aws-lambda';
 4import * as kinesis from 'aws-cdk-lib/aws-kinesis';
 5import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
 6import * as events from 'aws-cdk-lib/aws-events';
 7import * as targets from 'aws-cdk-lib/aws-events-targets';
 8import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
 9import * as iam from 'aws-cdk-lib/aws-iam';
10import { Construct } from 'constructs';
11
12export interface TwitterIngestionProps {
13  stockWatchlist: any[];
14  streamShardCount: number;
15}
16
17export class TwitterIngestionConstruct extends Construct {
18  public readonly tweetStream: kinesis.Stream;
19  public readonly tweetCache: dynamodb.Table;
20
21  constructor(scope: Construct, id: string, props: TwitterIngestionProps) {
22    super(scope, id);
23
24    // Kinesis Data Stream for tweet ingestion
25    this.tweetStream = new kinesis.Stream(this, 'TweetStream', {
26      shardCount: props.streamShardCount,
27      retentionPeriod: cdk.Duration.hours(24),
28      encryption: kinesis.StreamEncryption.MANAGED,
29    });
30
31    // DynamoDB table for tweet deduplication and caching
32    this.tweetCache = new dynamodb.Table(this, 'TweetCache', {
33      partitionKey: { name: 'tweet_id', type: dynamodb.AttributeType.STRING },
34      sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
35      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
36      timeToLiveAttribute: 'ttl',
37      stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
38    });
39
40    // Add GSI for querying by stock symbol
41    this.tweetCache.addGlobalSecondaryIndex({
42      indexName: 'SymbolIndex',
43      partitionKey: { name: 'stock_symbol', type: dynamodb.AttributeType.STRING },
44      sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
45      projectionType: dynamodb.ProjectionType.ALL,
46    });
47
48    // Secret for Twitter API credentials
49    const twitterApiSecret = secretsmanager.Secret.fromSecretNameV2(
50      this,
51      'TwitterApiSecret',
52      'prod/twitter-api-credentials'
53    );
54
55    // Lambda for Twitter stream collection
56    const tweetCollectorFn = new lambda.Function(this, 'TweetCollector', {
57      runtime: lambda.Runtime.PYTHON_3_11,
58      handler: 'index.handler',
59      code: lambda.Code.fromAsset('lib/lambda/tweet-collector'),
60      timeout: cdk.Duration.minutes(15),
61      memorySize: 1024,
62      environment: {
63        KINESIS_STREAM_NAME: this.tweetStream.streamName,
64        TWEET_CACHE_TABLE: this.tweetCache.tableName,
65        STOCK_WATCHLIST: JSON.stringify(props.stockWatchlist),
66        TWITTER_API_SECRET_NAME: twitterApiSecret.secretName,
67      },
68      reservedConcurrentExecutions: 1,  // Single instance for streaming
69    });
70
71    // Grant permissions
72    this.tweetStream.grantWrite(tweetCollectorFn);
73    this.tweetCache.grantWriteData(tweetCollectorFn);
74    twitterApiSecret.grantRead(tweetCollectorFn);
75
76    // Keep-alive rule to maintain Twitter stream connection
77    const keepAliveRule = new events.Rule(this, 'StreamKeepAlive', {
78      schedule: events.Schedule.rate(cdk.Duration.minutes(5)),
79      description: 'Keep Twitter stream connection alive',
80    });
81
82    keepAliveRule.addTarget(new targets.LambdaFunction(tweetCollectorFn));
83
84    // CloudWatch alarms for stream health
85    this.tweetStream.metricGetRecordsSuccess().createAlarm(this, 'StreamHealthAlarm', {
86      threshold: 1,
87      evaluationPeriods: 3,
88      comparisonOperator: cdk.aws_cloudwatch.ComparisonOperator.LESS_THAN_THRESHOLD,
89      treatMissingData: cdk.aws_cloudwatch.TreatMissingData.BREACHING,
90    });
91  }
92}

Tweet Collection Lambda Implementation

  1# lib/lambda/tweet-collector/index.py
  2import json
  3import os
  4import boto3
  5import tweepy
  6from datetime import datetime, timedelta
  7from typing import List, Dict
  8
  9kinesis_client = boto3.client('kinesis')
 10dynamodb = boto3.resource('dynamodb')
 11secrets_client = boto3.client('secretsmanager')
 12
 13KINESIS_STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
 14TWEET_CACHE_TABLE = os.environ['TWEET_CACHE_TABLE']
 15STOCK_WATCHLIST = json.loads(os.environ['STOCK_WATCHLIST'])
 16TWITTER_API_SECRET_NAME = os.environ['TWITTER_API_SECRET_NAME']
 17
 18# Global stream listener
 19stream_listener = None
 20
 21class SentimentStreamListener(tweepy.StreamingClient):
 22    """Custom Twitter stream listener for stock mentions"""
 23
 24    def __init__(self, bearer_token, stock_watchlist):
 25        super().__init__(bearer_token, wait_on_rate_limit=True)
 26        self.stock_watchlist = stock_watchlist
 27        self.tweet_cache_table = dynamodb.Table(TWEET_CACHE_TABLE)
 28
 29    def on_tweet(self, tweet):
 30        """Process incoming tweets"""
 31        try:
 32            # Extract stock symbols mentioned
 33            mentioned_stocks = self.extract_stock_symbols(tweet.text)
 34
 35            if not mentioned_stocks:
 36                return
 37
 38            # Get author info
 39            author_info = self.get_author_info(tweet.author_id)
 40
 41            # Calculate influence score
 42            influence_score = self.calculate_influence_score(
 43                author_info,
 44                mentioned_stocks
 45            )
 46
 47            # Prepare tweet data
 48            tweet_data = {
 49                'tweet_id': str(tweet.id),
 50                'author_id': str(tweet.author_id),
 51                'author_username': author_info['username'],
 52                'author_followers': author_info['followers_count'],
 53                'author_verified': author_info.get('verified', False),
 54                'influence_score': influence_score,
 55                'text': tweet.text,
 56                'mentioned_stocks': mentioned_stocks,
 57                'created_at': tweet.created_at.isoformat(),
 58                'timestamp': int(tweet.created_at.timestamp()),
 59                'engagement': {
 60                    'retweet_count': tweet.public_metrics.get('retweet_count', 0),
 61                    'reply_count': tweet.public_metrics.get('reply_count', 0),
 62                    'like_count': tweet.public_metrics.get('like_count', 0),
 63                    'quote_count': tweet.public_metrics.get('quote_count', 0),
 64                },
 65            }
 66
 67            # Send to Kinesis for processing
 68            self.send_to_kinesis(tweet_data)
 69
 70            # Cache in DynamoDB for deduplication
 71            self.cache_tweet(tweet_data)
 72
 73            print(f"Processed tweet {tweet.id} mentioning {mentioned_stocks}")
 74
 75        except Exception as e:
 76            print(f"Error processing tweet: {e}")
 77
 78    def extract_stock_symbols(self, text: str) -> List[str]:
 79        """Extract stock symbols from tweet text"""
 80        mentioned = []
 81
 82        for stock in self.stock_watchlist:
 83            symbol = stock['symbol']
 84            keywords = stock.get('keywords', [])
 85
 86            # Check for $SYMBOL cashtags
 87            if f"${symbol}" in text.upper():
 88                mentioned.append(symbol)
 89                continue
 90
 91            # Check for keywords
 92            text_upper = text.upper()
 93            for keyword in keywords:
 94                if keyword.upper() in text_upper:
 95                    mentioned.append(symbol)
 96                    break
 97
 98        return list(set(mentioned))  # Remove duplicates
 99
100    def get_author_info(self, author_id: str) -> Dict:
101        """Fetch author information from Twitter API"""
102        # This would be cached in practice
103        try:
104            user = self.get_user(id=author_id, user_fields=['public_metrics', 'verified'])
105            return {
106                'username': user.data.username,
107                'followers_count': user.data.public_metrics['followers_count'],
108                'verified': user.data.verified,
109            }
110        except:
111            return {
112                'username': 'unknown',
113                'followers_count': 0,
114                'verified': False,
115            }
116
117    def calculate_influence_score(self, author_info: Dict, mentioned_stocks: List[str]) -> float:
118        """Calculate tweet influence score based on author and context"""
119
120        score = 0.5  # Base score
121
122        # Follower count influence (logarithmic scale)
123        followers = author_info['followers_count']
124        if followers > 1000000:
125            score += 0.3
126        elif followers > 100000:
127            score += 0.2
128        elif followers > 10000:
129            score += 0.1
130
131        # Verified account bonus
132        if author_info.get('verified'):
133            score += 0.1
134
135        # Key influencer bonus
136        username = author_info['username'].lower()
137        for stock in self.stock_watchlist:
138            if stock['symbol'] in mentioned_stocks:
139                key_influencers = [inf.lower() for inf in stock.get('key_influencers', [])]
140                if username in key_influencers:
141                    score += 0.3
142                    break
143
144        return min(score, 1.0)  # Cap at 1.0
145
146    def send_to_kinesis(self, tweet_data: Dict):
147        """Send tweet data to Kinesis stream"""
148        try:
149            kinesis_client.put_record(
150                StreamName=KINESIS_STREAM_NAME,
151                Data=json.dumps(tweet_data),
152                PartitionKey=tweet_data['tweet_id']
153            )
154        except Exception as e:
155            print(f"Error sending to Kinesis: {e}")
156
157    def cache_tweet(self, tweet_data: Dict):
158        """Cache tweet in DynamoDB for deduplication"""
159        try:
160            # Add TTL (24 hours)
161            ttl = int(datetime.utcnow().timestamp()) + (24 * 60 * 60)
162
163            # Store for each mentioned stock
164            for symbol in tweet_data['mentioned_stocks']:
165                self.tweet_cache_table.put_item(Item={
166                    'tweet_id': tweet_data['tweet_id'],
167                    'timestamp': tweet_data['timestamp'],
168                    'stock_symbol': symbol,
169                    'author_username': tweet_data['author_username'],
170                    'influence_score': tweet_data['influence_score'],
171                    'text': tweet_data['text'],
172                    'ttl': ttl,
173                })
174        except Exception as e:
175            print(f"Error caching tweet: {e}")
176
177    def on_errors(self, errors):
178        print(f"Twitter API errors: {errors}")
179
180def handler(event, context):
181    """Lambda handler to maintain Twitter stream connection"""
182    global stream_listener
183
184    # Get Twitter API credentials
185    secret = secrets_client.get_secret_value(SecretId=TWITTER_API_SECRET_NAME)
186    credentials = json.loads(secret['SecretString'])
187    bearer_token = credentials['bearer_token']
188
189    # Initialize stream listener if not exists
190    if stream_listener is None:
191        stream_listener = SentimentStreamListener(bearer_token, STOCK_WATCHLIST)
192
193        # Build filter rules for all stocks
194        rules = []
195        for stock in STOCK_WATCHLIST:
196            symbol = stock['symbol']
197            # Track cashtag and keywords
198            rules.append(f"${symbol}")
199            for keyword in stock.get('keywords', [])[:3]:  # Limit keywords
200                rules.append(keyword)
201
202        # Delete existing rules
203        existing_rules = stream_listener.get_rules()
204        if existing_rules.data:
205            rule_ids = [rule.id for rule in existing_rules.data]
206            stream_listener.delete_rules(rule_ids)
207
208        # Add new rules
209        for rule in rules[:25]:  # Twitter API limit
210            stream_listener.add_rules(tweepy.StreamRule(rule))
211
212    # Start streaming (non-blocking)
213    try:
214        stream_listener.filter(
215            tweet_fields=['author_id', 'created_at', 'public_metrics', 'text'],
216            user_fields=['username', 'public_metrics', 'verified'],
217            threaded=True
218        )
219        return {
220            'statusCode': 200,
221            'body': 'Twitter stream active'
222        }
223    except Exception as e:
224        print(f"Stream error: {e}")
225        return {
226            'statusCode': 500,
227            'body': f'Stream error: {str(e)}'
228        }

2. Sentiment Analysis Layer

The sentiment analysis construct processes tweets using multiple ML models:

  1// lib/constructs/sentiment-analysis-construct.ts
  2import * as cdk from 'aws-cdk-lib';
  3import * as lambda from 'aws-cdk-lib/aws-lambda';
  4import * as kinesis from 'aws-cdk-lib/aws-kinesis';
  5import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
  6import * as iam from 'aws-cdk-lib/aws-iam';
  7import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';
  8import { Construct } from 'constructs';
  9
 10export interface SentimentAnalysisProps {
 11  tweetStream: kinesis.IStream;
 12  enabledModels: string[];  // ['bedrock', 'huggingface', 'comprehend']
 13}
 14
 15export class SentimentAnalysisConstruct extends Construct {
 16  public readonly sentimentTable: dynamodb.Table;
 17
 18  constructor(scope: Construct, id: string, props: SentimentAnalysisProps) {
 19    super(scope, id);
 20
 21    // DynamoDB table for sentiment scores
 22    this.sentimentTable = new dynamodb.Table(this, 'SentimentScores', {
 23      partitionKey: { name: 'stock_symbol', type: dynamodb.AttributeType.STRING },
 24      sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
 25      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
 26      timeToLiveAttribute: 'ttl',
 27      stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
 28    });
 29
 30    // GSI for querying recent sentiment
 31    this.sentimentTable.addGlobalSecondaryIndex({
 32      indexName: 'RecentSentimentIndex',
 33      partitionKey: { name: 'stock_symbol', type: dynamodb.AttributeType.STRING },
 34      sortKey: { name: 'sentiment_score', type: dynamodb.AttributeType.NUMBER },
 35      projectionType: dynamodb.ProjectionType.ALL,
 36    });
 37
 38    // Lambda Layer for NLP libraries
 39    const nlpLayer = new lambda.LayerVersion(this, 'NLPLibrariesLayer', {
 40      code: lambda.Code.fromAsset('lib/lambda/layers/nlp-libs'),
 41      compatibleRuntimes: [lambda.Runtime.PYTHON_3_11],
 42      description: 'NLP libraries: transformers, torch, nltk',
 43    });
 44
 45    // Sentiment Analysis Lambda
 46    const sentimentAnalyzerFn = new lambda.Function(this, 'SentimentAnalyzer', {
 47      runtime: lambda.Runtime.PYTHON_3_11,
 48      handler: 'index.handler',
 49      code: lambda.Code.fromAsset('lib/lambda/sentiment-analyzer'),
 50      timeout: cdk.Duration.seconds(300),
 51      memorySize: 3008,  // Max memory for NLP models
 52      layers: [nlpLayer],
 53      environment: {
 54        SENTIMENT_TABLE: this.sentimentTable.tableName,
 55        ENABLED_MODELS: JSON.stringify(props.enabledModels),
 56        BEDROCK_MODEL_ID: 'anthropic.claude-3-sonnet-20240229-v1:0',
 57      },
 58      reservedConcurrentExecutions: 10,  // Limit concurrent executions
 59    });
 60
 61    // Grant permissions
 62    this.sentimentTable.grantWriteData(sentimentAnalyzerFn);
 63
 64    // Bedrock access
 65    if (props.enabledModels.includes('bedrock')) {
 66      sentimentAnalyzerFn.addToRolePolicy(new iam.PolicyStatement({
 67        actions: ['bedrock:InvokeModel'],
 68        resources: ['*'],
 69      }));
 70    }
 71
 72    // Comprehend access
 73    if (props.enabledModels.includes('comprehend')) {
 74      sentimentAnalyzerFn.addToRolePolicy(new iam.PolicyStatement({
 75        actions: [
 76          'comprehend:DetectSentiment',
 77          'comprehend:DetectEntities',
 78        ],
 79        resources: ['*'],
 80      }));
 81    }
 82
 83    // Connect to Kinesis stream
 84    sentimentAnalyzerFn.addEventSource(
 85      new lambdaEventSources.KinesisEventSource(props.tweetStream, {
 86        batchSize: 10,
 87        startingPosition: lambda.StartingPosition.LATEST,
 88        retryAttempts: 3,
 89        parallelizationFactor: 5,
 90      })
 91    );
 92
 93    // Signal Aggregator Lambda (DynamoDB Stream trigger)
 94    const signalAggregatorFn = new lambda.Function(this, 'SignalAggregator', {
 95      runtime: lambda.Runtime.PYTHON_3_11,
 96      handler: 'index.handler',
 97      code: lambda.Code.fromAsset('lib/lambda/signal-aggregator'),
 98      timeout: cdk.Duration.seconds(60),
 99      memorySize: 1024,
100      environment: {
101        SENTIMENT_TABLE: this.sentimentTable.tableName,
102      },
103    });
104
105    this.sentimentTable.grantReadWriteData(signalAggregatorFn);
106
107    // Trigger on sentiment updates
108    signalAggregatorFn.addEventSource(
109      new lambdaEventSources.DynamoEventSource(this.sentimentTable, {
110        startingPosition: lambda.StartingPosition.LATEST,
111        batchSize: 100,
112        retryAttempts: 2,
113      })
114    );
115  }
116}

Sentiment Analysis Lambda Implementation

  1# lib/lambda/sentiment-analyzer/index.py
  2import json
  3import os
  4import boto3
  5import base64
  6from datetime import datetime
  7from typing import Dict, List, Tuple
  8from decimal import Decimal
  9
 10bedrock_runtime = boto3.client('bedrock-runtime')
 11comprehend_client = boto3.client('comprehend')
 12dynamodb = boto3.resource('dynamodb')
 13
 14SENTIMENT_TABLE = os.environ['SENTIMENT_TABLE']
 15ENABLED_MODELS = json.loads(os.environ['ENABLED_MODELS'])
 16BEDROCK_MODEL_ID = os.environ['BEDROCK_MODEL_ID']
 17
 18def handler(event, context):
 19    """Analyze sentiment from Kinesis stream records"""
 20
 21    processed = 0
 22    failed = 0
 23
 24    for record in event['Records']:
 25        try:
 26            # Decode tweet data
 27            payload = base64.b64decode(record['kinesis']['data'])
 28            tweet_data = json.loads(payload)
 29
 30            # Analyze sentiment using multiple models
 31            sentiment_results = analyze_tweet_sentiment(tweet_data)
 32
 33            # Store sentiment scores
 34            store_sentiment_scores(tweet_data, sentiment_results)
 35
 36            processed += 1
 37
 38        except Exception as e:
 39            print(f"Error processing record: {e}")
 40            failed += 1
 41
 42    return {
 43        'statusCode': 200,
 44        'processed': processed,
 45        'failed': failed
 46    }
 47
 48def analyze_tweet_sentiment(tweet_data: Dict) -> Dict:
 49    """Analyze sentiment using multiple models"""
 50
 51    text = tweet_data['text']
 52    results = {}
 53
 54    # AWS Comprehend
 55    if 'comprehend' in ENABLED_MODELS:
 56        results['comprehend'] = analyze_with_comprehend(text)
 57
 58    # AWS Bedrock (Claude)
 59    if 'bedrock' in ENABLED_MODELS:
 60        results['bedrock'] = analyze_with_bedrock(text, tweet_data)
 61
 62    # HuggingFace FinBERT (if deployed)
 63    if 'huggingface' in ENABLED_MODELS:
 64        results['huggingface'] = analyze_with_finbert(text)
 65
 66    # Ensemble the results
 67    ensemble_sentiment = ensemble_sentiment_scores(results)
 68
 69    return {
 70        'individual': results,
 71        'ensemble': ensemble_sentiment,
 72        'confidence': calculate_confidence(results)
 73    }
 74
 75def analyze_with_comprehend(text: str) -> Dict:
 76    """Analyze sentiment using AWS Comprehend"""
 77    try:
 78        response = comprehend_client.detect_sentiment(
 79            Text=text,
 80            LanguageCode='en'
 81        )
 82
 83        sentiment = response['Sentiment']
 84        scores = response['SentimentScore']
 85
 86        # Convert to normalized score (-1 to 1)
 87        if sentiment == 'POSITIVE':
 88            score = scores['Positive']
 89        elif sentiment == 'NEGATIVE':
 90            score = -scores['Negative']
 91        elif sentiment == 'NEUTRAL':
 92            score = 0
 93        else:  # MIXED
 94            score = scores['Positive'] - scores['Negative']
 95
 96        return {
 97            'model': 'comprehend',
 98            'sentiment': sentiment,
 99            'score': score,
100            'confidence': max(scores.values()),
101        }
102
103    except Exception as e:
104        print(f"Comprehend error: {e}")
105        return None
106
107def analyze_with_bedrock(text: str, tweet_data: Dict) -> Dict:
108    """Analyze sentiment using AWS Bedrock Claude"""
109
110    author = tweet_data['author_username']
111    influence = tweet_data['influence_score']
112    stocks = tweet_data['mentioned_stocks']
113
114    prompt = f"""You are a financial sentiment analyst. Analyze the following tweet about stocks {', '.join(stocks)}.
115
116Tweet: "{text}"
117Author: @{author} (Influence score: {influence:.2f})
118
119Provide sentiment analysis in JSON format:
120{{
121    "sentiment": "<positive|negative|neutral>",
122    "score": <-1.0 to 1.0>,
123    "confidence": <0.0 to 1.0>,
124    "market_impact": "<high|medium|low>",
125    "reasoning": "<brief explanation>",
126    "key_factors": ["<factor1>", "<factor2>"]
127}}
128
129Consider:
1301. Financial context and terminology
1312. Author's influence and credibility
1323. Specific stock mentions and context
1334. Market timing and relevance"""
134
135    try:
136        response = bedrock_runtime.invoke_model(
137            modelId=BEDROCK_MODEL_ID,
138            body=json.dumps({
139                "anthropic_version": "bedrock-2023-05-31",
140                "max_tokens": 500,
141                "messages": [{
142                    "role": "user",
143                    "content": prompt
144                }]
145            })
146        )
147
148        result = json.loads(response['body'].read())
149        analysis = json.loads(result['content'][0]['text'])
150
151        return {
152            'model': 'bedrock-claude',
153            'sentiment': analysis['sentiment'],
154            'score': analysis['score'],
155            'confidence': analysis['confidence'],
156            'market_impact': analysis['market_impact'],
157            'reasoning': analysis['reasoning'],
158            'key_factors': analysis['key_factors'],
159        }
160
161    except Exception as e:
162        print(f"Bedrock error: {e}")
163        return None
164
165def analyze_with_finbert(text: str) -> Dict:
166    """Analyze sentiment using FinBERT model"""
167    # This would call a SageMaker endpoint with FinBERT
168    # Placeholder implementation
169    try:
170        # In production, this would invoke SageMaker endpoint
171        # For now, return mock data
172        return {
173            'model': 'finbert',
174            'sentiment': 'positive',
175            'score': 0.65,
176            'confidence': 0.75,
177        }
178    except Exception as e:
179        print(f"FinBERT error: {e}")
180        return None
181
182def ensemble_sentiment_scores(results: Dict) -> Dict:
183    """Ensemble multiple sentiment scores with weighting"""
184
185    valid_results = [r for r in results.values() if r is not None]
186
187    if not valid_results:
188        return None
189
190    # Weight models differently
191    model_weights = {
192        'bedrock-claude': 0.4,
193        'finbert': 0.35,
194        'comprehend': 0.25,
195    }
196
197    total_weight = 0
198    weighted_score = 0
199
200    for result in valid_results:
201        model = result['model']
202        weight = model_weights.get(model, 0.33)
203        weighted_score += result['score'] * weight * result['confidence']
204        total_weight += weight * result['confidence']
205
206    final_score = weighted_score / total_weight if total_weight > 0 else 0
207
208    # Determine sentiment label
209    if final_score > 0.2:
210        sentiment = 'POSITIVE'
211    elif final_score < -0.2:
212        sentiment = 'NEGATIVE'
213    else:
214        sentiment = 'NEUTRAL'
215
216    return {
217        'sentiment': sentiment,
218        'score': final_score,
219        'num_models': len(valid_results),
220    }
221
222def calculate_confidence(results: Dict) -> float:
223    """Calculate confidence based on model agreement"""
224
225    valid_results = [r for r in results.values() if r is not None]
226
227    if not valid_results:
228        return 0.0
229
230    # Check sentiment agreement
231    sentiments = [r['sentiment'].upper() for r in valid_results]
232    most_common = max(set(sentiments), key=sentiments.count)
233    agreement_ratio = sentiments.count(most_common) / len(sentiments)
234
235    # Average confidence
236    avg_confidence = sum(r['confidence'] for r in valid_results) / len(valid_results)
237
238    return agreement_ratio * avg_confidence
239
240def store_sentiment_scores(tweet_data: Dict, sentiment_results: Dict):
241    """Store sentiment scores in DynamoDB"""
242
243    table = dynamodb.Table(SENTIMENT_TABLE)
244    timestamp = int(datetime.utcnow().timestamp())
245
246    # Store for each mentioned stock
247    for symbol in tweet_data['mentioned_stocks']:
248        ensemble = sentiment_results['ensemble']
249
250        if ensemble is None:
251            continue
252
253        table.put_item(Item={
254            'stock_symbol': symbol,
255            'timestamp': timestamp,
256            'tweet_id': tweet_data['tweet_id'],
257            'author_username': tweet_data['author_username'],
258            'influence_score': Decimal(str(tweet_data['influence_score'])),
259            'sentiment': ensemble['sentiment'],
260            'sentiment_score': Decimal(str(ensemble['score'])),
261            'confidence': Decimal(str(sentiment_results['confidence'])),
262            'text': tweet_data['text'],
263            'engagement': tweet_data['engagement'],
264            'individual_models': json.dumps(sentiment_results['individual'], default=str),
265            'ttl': timestamp + (24 * 60 * 60),  # 24 hour retention
266        })

Signal Aggregator Lambda

  1# lib/lambda/signal-aggregator/index.py
  2import json
  3import os
  4import boto3
  5from datetime import datetime, timedelta
  6from decimal import Decimal
  7from typing import Dict, List
  8from collections import defaultdict
  9
 10dynamodb = boto3.resource('dynamodb')
 11eventbridge_client = boto3.client('events')
 12
 13SENTIMENT_TABLE = os.environ['SENTIMENT_TABLE']
 14
 15def handler(event, context):
 16    """Aggregate sentiment signals and detect trading opportunities"""
 17
 18    # Group sentiment updates by stock
 19    stock_updates = defaultdict(list)
 20
 21    for record in event['Records']:
 22        if record['eventName'] in ['INSERT', 'MODIFY']:
 23            new_image = record['dynamodb']['NewImage']
 24            stock_symbol = new_image['stock_symbol']['S']
 25            stock_updates[stock_symbol].append(new_image)
 26
 27    # Analyze each stock
 28    trading_signals = []
 29
 30    for symbol, updates in stock_updates.items():
 31        signal = analyze_stock_sentiment(symbol)
 32        if signal:
 33            trading_signals.append(signal)
 34
 35    # Emit trading signals to EventBridge
 36    for signal in trading_signals:
 37        emit_trading_signal(signal)
 38
 39    return {
 40        'statusCode': 200,
 41        'signals_generated': len(trading_signals)
 42    }
 43
 44def analyze_stock_sentiment(symbol: str) -> Dict:
 45    """Analyze aggregated sentiment for a stock"""
 46
 47    table = dynamodb.Table(SENTIMENT_TABLE)
 48
 49    # Query recent sentiment (last 30 minutes)
 50    start_time = int((datetime.utcnow() - timedelta(minutes=30)).timestamp())
 51
 52    response = table.query(
 53        KeyConditionExpression='stock_symbol = :symbol AND #ts >= :start_time',
 54        ExpressionAttributeNames={'#ts': 'timestamp'},
 55        ExpressionAttributeValues={
 56            ':symbol': symbol,
 57            ':start_time': start_time,
 58        }
 59    )
 60
 61    items = response['Items']
 62
 63    if len(items) < 10:  # Minimum tweet volume
 64        return None
 65
 66    # Calculate weighted sentiment score
 67    total_score = 0
 68    total_weight = 0
 69
 70    for item in items:
 71        score = float(item['sentiment_score'])
 72        influence = float(item['influence_score'])
 73        confidence = float(item['confidence'])
 74
 75        # Time decay: recent tweets have more weight
 76        age_minutes = (datetime.utcnow().timestamp() - item['timestamp']) / 60
 77        time_weight = max(0.1, 1 - (age_minutes / 30))
 78
 79        weight = influence * confidence * time_weight
 80        total_score += score * weight
 81        total_weight += weight
 82
 83    if total_weight == 0:
 84        return None
 85
 86    avg_sentiment = total_score / total_weight
 87
 88    # Calculate sentiment change (compare to 1 hour ago)
 89    previous_sentiment = get_previous_sentiment(symbol, hours=1)
 90    sentiment_change = avg_sentiment - previous_sentiment if previous_sentiment else 0
 91
 92    # Determine if this is a trading signal
 93    if abs(sentiment_change) < 0.15:  # Minimum change threshold
 94        return None
 95
 96    return {
 97        'stock_symbol': symbol,
 98        'current_sentiment': avg_sentiment,
 99        'previous_sentiment': previous_sentiment,
100        'sentiment_change': sentiment_change,
101        'direction': 'BUY' if sentiment_change > 0 else 'SELL',
102        'tweet_volume': len(items),
103        'confidence': total_weight / len(items),
104        'timestamp': int(datetime.utcnow().timestamp()),
105    }
106
107def get_previous_sentiment(symbol: str, hours: int = 1) -> float:
108    """Get average sentiment from previous time period"""
109
110    table = dynamodb.Table(SENTIMENT_TABLE)
111
112    end_time = int((datetime.utcnow() - timedelta(hours=hours)).timestamp())
113    start_time = end_time - (30 * 60)  # 30 minutes window
114
115    response = table.query(
116        KeyConditionExpression='stock_symbol = :symbol AND #ts BETWEEN :start AND :end',
117        ExpressionAttributeNames={'#ts': 'timestamp'},
118        ExpressionAttributeValues={
119            ':symbol': symbol,
120            ':start': start_time,
121            ':end': end_time,
122        }
123    )
124
125    items = response['Items']
126
127    if not items:
128        return 0.0
129
130    avg = sum(float(item['sentiment_score']) for item in items) / len(items)
131    return avg
132
133def emit_trading_signal(signal: Dict):
134    """Emit trading signal to EventBridge"""
135
136    try:
137        eventbridge_client.put_events(
138            Entries=[{
139                'Source': 'sentiment.trading',
140                'DetailType': 'TradingSignal',
141                'Detail': json.dumps(signal, default=str),
142                'EventBusName': 'default',
143            }]
144        )
145        print(f"Emitted trading signal for {signal['stock_symbol']}: {signal['direction']}")
146    except Exception as e:
147        print(f"Error emitting signal: {e}")

3. Trading Engine with Portfolio Management

  1// lib/constructs/trading-engine-construct.ts
  2import * as cdk from 'aws-cdk-lib';
  3import * as lambda from 'aws-cdk-lib/aws-lambda';
  4import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
  5import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
  6import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
  7import * as events from 'aws-cdk-lib/aws-events';
  8import * as targets from 'aws-cdk-lib/aws-events-targets';
  9import * as sns from 'aws-cdk-lib/aws-sns';
 10import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
 11import { Construct } from 'constructs';
 12
 13export interface TradingEngineProps {
 14  tradingRules: any;
 15  stockWatchlist: any[];
 16}
 17
 18export class TradingEngineConstruct extends Construct {
 19  public readonly portfolioTable: dynamodb.Table;
 20  public readonly tradeHistoryTable: dynamodb.Table;
 21  public readonly alertTopic: sns.Topic;
 22
 23  constructor(scope: Construct, id: string, props: TradingEngineProps) {
 24    super(scope, id);
 25
 26    // Portfolio state table
 27    this.portfolioTable = new dynamodb.Table(this, 'Portfolio', {
 28      partitionKey: { name: 'stock_symbol', type: dynamodb.AttributeType.STRING },
 29      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
 30    });
 31
 32    // Trade history table
 33    this.tradeHistoryTable = new dynamodb.Table(this, 'TradeHistory', {
 34      partitionKey: { name: 'trade_id', type: dynamodb.AttributeType.STRING },
 35      sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
 36      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
 37    });
 38
 39    // Add GSI for querying by stock
 40    this.tradeHistoryTable.addGlobalSecondaryIndex({
 41      indexName: 'StockTradesIndex',
 42      partitionKey: { name: 'stock_symbol', type: dynamodb.AttributeType.STRING },
 43      sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
 44    });
 45
 46    // SNS topic for alerts
 47    this.alertTopic = new sns.Topic(this, 'TradingAlerts', {
 48      displayName: 'Sentiment Trading Alerts',
 49    });
 50
 51    // Alpaca API credentials secret
 52    const alpacaApiSecret = secretsmanager.Secret.fromSecretNameV2(
 53      this,
 54      'AlpacaApiSecret',
 55      'prod/alpaca-api-credentials'
 56    );
 57
 58    // Risk Manager Lambda
 59    const riskManagerFn = new lambda.Function(this, 'RiskManager', {
 60      runtime: lambda.Runtime.PYTHON_3_11,
 61      handler: 'index.handler',
 62      code: lambda.Code.fromAsset('lib/lambda/risk-manager'),
 63      timeout: cdk.Duration.seconds(30),
 64      environment: {
 65        PORTFOLIO_TABLE: this.portfolioTable.tableName,
 66        TRADE_HISTORY_TABLE: this.tradeHistoryTable.tableName,
 67        MAX_PORTFOLIO_VALUE: props.tradingRules.max_total_portfolio.toString(),
 68        MAX_DAILY_TRADES: props.tradingRules.max_daily_trades_per_stock?.toString() || '5',
 69        STOCK_WATCHLIST: JSON.stringify(props.stockWatchlist),
 70      },
 71    });
 72
 73    this.portfolioTable.grantReadData(riskManagerFn);
 74    this.tradeHistoryTable.grantReadData(riskManagerFn);
 75
 76    // Trade Executor Lambda
 77    const tradeExecutorFn = new lambda.Function(this, 'TradeExecutor', {
 78      runtime: lambda.Runtime.PYTHON_3_11,
 79      handler: 'index.handler',
 80      code: lambda.Code.fromAsset('lib/lambda/trade-executor'),
 81      timeout: cdk.Duration.seconds(60),
 82      environment: {
 83        PORTFOLIO_TABLE: this.portfolioTable.tableName,
 84        TRADE_HISTORY_TABLE: this.tradeHistoryTable.tableName,
 85        ALERT_TOPIC_ARN: this.alertTopic.topicArn,
 86        ENABLE_REAL_TRADING: props.tradingRules.enable_real_trading.toString(),
 87        ALPACA_API_SECRET_NAME: alpacaApiSecret.secretName,
 88        MARKET_HOURS_ONLY: props.tradingRules.market_hours_only?.toString() || 'true',
 89      },
 90    });
 91
 92    this.portfolioTable.grantReadWriteData(tradeExecutorFn);
 93    this.tradeHistoryTable.grantWriteData(tradeExecutorFn);
 94    this.alertTopic.grantPublish(tradeExecutorFn);
 95    alpacaApiSecret.grantRead(tradeExecutorFn);
 96
 97    // Step Functions workflow
 98    const checkRisk = new tasks.LambdaInvoke(this, 'CheckRisk', {
 99      lambdaFunction: riskManagerFn,
100      outputPath: '$.Payload',
101    });
102
103    const executeTrade = new tasks.LambdaInvoke(this, 'ExecuteTrade', {
104      lambdaFunction: tradeExecutorFn,
105      outputPath: '$.Payload',
106    });
107
108    const riskApproved = new sfn.Choice(this, 'RiskApproved')
109      .when(
110        sfn.Condition.booleanEquals('$.risk_approved', true),
111        executeTrade
112      )
113      .otherwise(new sfn.Succeed(this, 'TradeRejected'));
114
115    const definition = checkRisk.next(riskApproved);
116
117    const tradingStateMachine = new sfn.StateMachine(this, 'TradingWorkflow', {
118      definition,
119      timeout: cdk.Duration.minutes(5),
120    });
121
122    // EventBridge rule to trigger on trading signals
123    new events.Rule(this, 'TradingSignalRule', {
124      eventPattern: {
125        source: ['sentiment.trading'],
126        detailType: ['TradingSignal'],
127      },
128      targets: [new targets.SfnStateMachine(tradingStateMachine)],
129    });
130  }
131}

Trade Executor Implementation

  1# lib/lambda/trade-executor/index.py
  2import json
  3import os
  4import boto3
  5from datetime import datetime
  6import alpaca_trade_api as tradeapi
  7from decimal import Decimal
  8
  9dynamodb = boto3.resource('dynamodb')
 10sns_client = boto3.client('sns')
 11secrets_client = boto3.client('secretsmanager')
 12
 13PORTFOLIO_TABLE = os.environ['PORTFOLIO_TABLE']
 14TRADE_HISTORY_TABLE = os.environ['TRADE_HISTORY_TABLE']
 15ALERT_TOPIC_ARN = os.environ['ALERT_TOPIC_ARN']
 16ENABLE_REAL_TRADING = os.environ['ENABLE_REAL_TRADING'].lower() == 'true'
 17ALPACA_API_SECRET_NAME = os.environ['ALPACA_API_SECRET_NAME']
 18MARKET_HOURS_ONLY = os.environ.get('MARKET_HOURS_ONLY', 'true').lower() == 'true'
 19
 20# Initialize Alpaca API client
 21alpaca_api = None
 22
 23def get_alpaca_client():
 24    """Initialize Alpaca API client"""
 25    global alpaca_api
 26
 27    if alpaca_api is None:
 28        secret = secrets_client.get_secret_value(SecretId=ALPACA_API_SECRET_NAME)
 29        credentials = json.loads(secret['SecretString'])
 30
 31        alpaca_api = tradeapi.REST(
 32            credentials['api_key'],
 33            credentials['api_secret'],
 34            credentials['base_url'],  # Paper or live trading URL
 35            api_version='v2'
 36        )
 37
 38    return alpaca_api
 39
 40def handler(event, context):
 41    """Execute trade based on sentiment signal and risk approval"""
 42
 43    signal = event.get('detail', event)
 44
 45    if not event.get('risk_approved', False):
 46        return {
 47            'trade_executed': False,
 48            'reason': event.get('risk_reason', 'Risk check failed')
 49        }
 50
 51    stock_symbol = signal['stock_symbol']
 52    direction = signal['direction']
 53    sentiment_change = signal['sentiment_change']
 54    confidence = signal['confidence']
 55
 56    # Check market hours
 57    if MARKET_HOURS_ONLY and not is_market_open():
 58        return {
 59            'trade_executed': False,
 60            'reason': 'Market is closed'
 61        }
 62
 63    # Get current stock price
 64    current_price = get_current_price(stock_symbol)
 65
 66    if not current_price:
 67        return {
 68            'trade_executed': False,
 69            'reason': 'Unable to fetch current price'
 70        }
 71
 72    # Calculate position size based on sentiment strength
 73    position_size = calculate_position_size(
 74        stock_symbol,
 75        sentiment_change,
 76        confidence,
 77        current_price
 78    )
 79
 80    if position_size == 0:
 81        return {
 82            'trade_executed': False,
 83            'reason': 'Position size too small'
 84        }
 85
 86    # Execute trade
 87    if ENABLE_REAL_TRADING:
 88        trade_result = execute_alpaca_trade(
 89            stock_symbol,
 90            direction,
 91            position_size,
 92            current_price
 93        )
 94    else:
 95        trade_result = simulate_trade(
 96            stock_symbol,
 97            direction,
 98            position_size,
 99            current_price
100        )
101
102    # Update portfolio
103    update_portfolio(stock_symbol, direction, position_size, current_price)
104
105    # Log trade
106    log_trade(signal, trade_result, position_size, current_price)
107
108    # Send alert
109    send_trade_alert(stock_symbol, direction, position_size, current_price, trade_result)
110
111    return {
112        'trade_executed': True,
113        'stock_symbol': stock_symbol,
114        'direction': direction,
115        'quantity': position_size,
116        'price': current_price,
117        'trade_id': trade_result['trade_id'],
118    }
119
120def is_market_open() -> bool:
121    """Check if market is currently open"""
122    try:
123        api = get_alpaca_client()
124        clock = api.get_clock()
125        return clock.is_open
126    except:
127        # Fallback to simple time check
128        now = datetime.now()
129        return now.weekday() < 5 and 9 <= now.hour < 16
130
131def get_current_price(symbol: str) -> float:
132    """Get current stock price from Alpaca"""
133    try:
134        api = get_alpaca_client()
135        quote = api.get_latest_trade(symbol)
136        return float(quote.price)
137    except Exception as e:
138        print(f"Error fetching price for {symbol}: {e}")
139        return None
140
141def calculate_position_size(
142    symbol: str,
143    sentiment_change: float,
144    confidence: float,
145    price: float
146) -> int:
147    """Calculate number of shares to trade"""
148
149    # Get current portfolio
150    portfolio_table = dynamodb.Table(PORTFOLIO_TABLE)
151
152    try:
153        response = portfolio_table.get_item(Key={'stock_symbol': symbol})
154        current_position = int(response.get('Item', {}).get('quantity', 0))
155    except:
156        current_position = 0
157
158    # Calculate base position size (as percentage of max allocation)
159    signal_strength = abs(sentiment_change) * confidence
160    max_position_value = 10000  # From config
161
162    target_value = max_position_value * signal_strength
163    target_shares = int(target_value / price)
164
165    # Limit to reasonable position changes
166    max_change = max(10, int(target_shares * 0.3))  # Max 30% change
167    position_change = min(max_change, target_shares)
168
169    return position_change
170
171def execute_alpaca_trade(
172    symbol: str,
173    direction: str,
174    quantity: int,
175    price: float
176) -> dict:
177    """Execute real trade via Alpaca API"""
178
179    try:
180        api = get_alpaca_client()
181
182        side = 'buy' if direction == 'BUY' else 'sell'
183
184        order = api.submit_order(
185            symbol=symbol,
186            qty=quantity,
187            side=side,
188            type='market',
189            time_in_force='day'
190        )
191
192        return {
193            'trade_id': order.id,
194            'status': order.status,
195            'filled_price': float(order.filled_avg_price) if order.filled_avg_price else price,
196            'simulated': False,
197        }
198
199    except Exception as e:
200        print(f"Alpaca trade error: {e}")
201        return {
202            'trade_id': 'ERROR',
203            'status': 'failed',
204            'error': str(e),
205            'simulated': False,
206        }
207
208def simulate_trade(
209    symbol: str,
210    direction: str,
211    quantity: int,
212    price: float
213) -> dict:
214    """Simulate trade for testing"""
215
216    import uuid
217
218    return {
219        'trade_id': str(uuid.uuid4()),
220        'status': 'filled',
221        'filled_price': price,
222        'simulated': True,
223    }
224
225def update_portfolio(symbol: str, direction: str, quantity: int, price: float):
226    """Update portfolio state in DynamoDB"""
227
228    table = dynamodb.Table(PORTFOLIO_TABLE)
229
230    # Get current position
231    try:
232        response = table.get_item(Key={'stock_symbol': symbol})
233        item = response.get('Item', {})
234        current_qty = int(item.get('quantity', 0))
235        current_value = float(item.get('total_value', 0))
236    except:
237        current_qty = 0
238        current_value = 0
239
240    # Update position
241    if direction == 'BUY':
242        new_qty = current_qty + quantity
243        new_value = current_value + (quantity * price)
244    else:  # SELL
245        new_qty = max(0, current_qty - quantity)
246        new_value = max(0, current_value - (quantity * price))
247
248    table.put_item(Item={
249        'stock_symbol': symbol,
250        'quantity': new_qty,
251        'total_value': Decimal(str(new_value)),
252        'avg_price': Decimal(str(new_value / new_qty)) if new_qty > 0 else Decimal('0'),
253        'last_updated': int(datetime.utcnow().timestamp()),
254    })
255
256def log_trade(signal: dict, trade_result: dict, quantity: int, price: float):
257    """Log trade to DynamoDB"""
258
259    table = dynamodb.Table(TRADE_HISTORY_TABLE)
260    timestamp = int(datetime.utcnow().timestamp())
261
262    table.put_item(Item={
263        'trade_id': trade_result['trade_id'],
264        'timestamp': timestamp,
265        'stock_symbol': signal['stock_symbol'],
266        'direction': signal['direction'],
267        'quantity': quantity,
268        'price': Decimal(str(price)),
269        'filled_price': Decimal(str(trade_result.get('filled_price', price))),
270        'sentiment_change': Decimal(str(signal['sentiment_change'])),
271        'confidence': Decimal(str(signal['confidence'])),
272        'status': trade_result['status'],
273        'simulated': trade_result.get('simulated', False),
274    })
275
276def send_trade_alert(symbol: str, direction: str, quantity: int, price: float, result: dict):
277    """Send SNS notification"""
278
279    total_value = quantity * price
280    simulated_tag = '[SIMULATED]' if result.get('simulated') else '[REAL]'
281
282    message = f"""
283Sentiment-Driven Trade Executed {simulated_tag}
284
285Stock: {symbol}
286Direction: {direction}
287Quantity: {quantity} shares
288Price: ${price:.2f}
289Total Value: ${total_value:.2f}
290Trade ID: {result['trade_id']}
291Status: {result['status']}
292    """
293
294    sns_client.publish(
295        TopicArn=ALERT_TOPIC_ARN,
296        Subject=f'Trade Alert: {direction} {symbol}',
297        Message=message
298    )

Deployment

Deploy the complete sentiment trading system:

 1# Install dependencies
 2npm install
 3
 4# Deploy to development (simulated trading)
 5cdk deploy --all --context env=dev
 6
 7# Deploy to production (requires Twitter and Alpaca API credentials)
 8cdk deploy --all --context env=prod \
 9    --context enableRealTrading=true \
10    --context maxPortfolio=100000

Monitoring Dashboard

Create CloudWatch dashboard for real-time monitoring:

 1new MonitoringConstruct(this, 'Monitoring', {
 2  tweetStream,
 3  sentimentTable,
 4  portfolioTable,
 5  metrics: [
 6    'TweetIngestionRate',
 7    'SentimentScoreDistribution',
 8    'TradingSignalsGenerated',
 9    'TradesExecuted',
10    'PortfolioValue',
11    'PortfolioPerformance'
12  ]
13});

Security & Compliance

  1. API Key Security: Store all credentials in Secrets Manager with rotation
  2. Data Encryption: Enable encryption at rest for all DynamoDB tables and Kinesis streams
  3. IAM Least Privilege: Each Lambda has minimal required permissions
  4. Audit Logging: CloudTrail logs all API calls and trades
  5. SEC Compliance: Maintain complete trade history for regulatory requirements

Cost Optimization

  • Lambda Memory: Right-size sentiment analysis functions (3GB for NLP)
  • Kinesis Shards: Start with 2 shards, scale based on tweet volume
  • DynamoDB: Use on-demand billing for unpredictable workloads
  • Reserved Capacity: Consider RI for consistent Alpaca API usage

Conclusion

This sentiment-driven trading system demonstrates the power of combining social media intelligence with cloud-native architecture. By analyzing X.com in real-time and using multiple ML models for sentiment analysis, the system can identify trading opportunities before traditional news sources.

Key Takeaways

  • Real-Time Processing: Kinesis streams process millions of tweets with sub-second latency
  • Multi-Model Sentiment: Ensemble approach improves accuracy using Bedrock, FinBERT, and Comprehend
  • Risk Management: Multi-layered safety checks prevent catastrophic losses
  • Scalable Architecture: Fully serverless design scales automatically with tweet volume
  • Production-Ready: Complete monitoring, alerting, and compliance features

Disclaimer: This system is for educational purposes. Stock trading involves substantial risk. Social media sentiment can be manipulated. Always test thoroughly, never invest more than you can afford to lose, and comply with all securities regulations.

Yen

Yen

Yen