🎯 Introduction
Database sharding is a critical technique for achieving horizontal scalability in large-scale applications. As data volumes grow beyond what a single database instance can handle efficiently, sharding becomes essential for maintaining performance and availability. This comprehensive guide explores MySQL sharding strategies, comparing different approaches, implementation patterns, and real-world considerations.
Sharding involves distributing data across multiple database instances (shards), where each shard contains a subset of the total data. This approach enables applications to scale beyond the limitations of vertical scaling and provides better resource utilization across multiple servers.
📚 Understanding Database Sharding
🔍 What is Sharding?
graph TD
A[Application Layer] --> B[Shard Router/Proxy]
B --> C[Shard 1<br/>Users 1-1000]
B --> D[Shard 2<br/>Users 1001-2000]
B --> E[Shard 3<br/>Users 2001-3000]
B --> F[Shard N<br/>Users N*1000+1-N+1*1000]
C --> G[(MySQL Instance 1)]
D --> H[(MySQL Instance 2)]
E --> I[(MySQL Instance 3)]
F --> J[(MySQL Instance N)]
style A fill:#ff6b6b
style B fill:#4ecdc4
style G fill:#feca57
style H fill:#feca57
style I fill:#feca57
style J fill:#feca57
Sharding splits a large database into smaller, more manageable pieces called shards. Each shard is an independent database that contains a subset of the application’s data. The application uses a shard key to determine which shard should store or retrieve specific data.
🎯 When to Consider Sharding
Consider sharding when:
- Single database performance becomes a bottleneck
- Data size exceeds storage capacity of single instance
- Read/write operations exceed single server capabilities
- High availability requirements across geographic regions
- Cost optimization through distributed infrastructure
Avoid sharding if:
- Application can scale vertically (more CPU/RAM/storage)
- Read replicas can solve read performance issues
- Data size is manageable for single instance
- Application complexity increase isn’t justified
🛠️ MySQL Sharding Strategies
1. Range-Based Sharding
Range-based sharding distributes data based on ranges of the shard key values.
graph TD
A[Shard Key: User ID] --> B{Range Check}
B -->|1-10000| C[Shard 1]
B -->|10001-20000| D[Shard 2]
B -->|20001-30000| E[Shard 3]
B -->|30001+| F[Shard N]
C --> G[MySQL Server 1<br/>user_id: 1-10000]
D --> H[MySQL Server 2<br/>user_id: 10001-20000]
E --> I[MySQL Server 3<br/>user_id: 20001-30000]
F --> J[MySQL Server N<br/>user_id: 30001+]
style A fill:#ff6b6b
style B fill:#4ecdc4
style G fill:#feca57
style H fill:#feca57
style I fill:#feca57
style J fill:#feca57
🛠️ Range-Based Implementation
1@Component
2public class RangeBasedShardingStrategy implements ShardingStrategy {
3
4 private final Map<String, ShardRange> shardRanges;
5 private final Map<String, DataSource> dataSources;
6
7 public RangeBasedShardingStrategy() {
8 this.shardRanges = new HashMap<>();
9 this.dataSources = new HashMap<>();
10 initializeShards();
11 }
12
13 private void initializeShards() {
14 // Define shard ranges
15 shardRanges.put("shard1", new ShardRange(1L, 10000L));
16 shardRanges.put("shard2", new ShardRange(10001L, 20000L));
17 shardRanges.put("shard3", new ShardRange(20001L, 30000L));
18 shardRanges.put("shard4", new ShardRange(30001L, Long.MAX_VALUE));
19
20 // Initialize data sources
21 dataSources.put("shard1", createDataSource("jdbc:mysql://db1:3306/shard1"));
22 dataSources.put("shard2", createDataSource("jdbc:mysql://db2:3306/shard2"));
23 dataSources.put("shard3", createDataSource("jdbc:mysql://db3:3306/shard3"));
24 dataSources.put("shard4", createDataSource("jdbc:mysql://db4:3306/shard4"));
25 }
26
27 @Override
28 public String determineShardKey(Object shardingValue) {
29 Long value = (Long) shardingValue;
30
31 return shardRanges.entrySet().stream()
32 .filter(entry -> {
33 ShardRange range = entry.getValue();
34 return value >= range.getMin() && value <= range.getMax();
35 })
36 .map(Map.Entry::getKey)
37 .findFirst()
38 .orElseThrow(() -> new IllegalArgumentException("No shard found for value: " + value));
39 }
40
41 @Override
42 public DataSource getDataSource(String shardKey) {
43 DataSource dataSource = dataSources.get(shardKey);
44 if (dataSource == null) {
45 throw new IllegalArgumentException("No data source found for shard: " + shardKey);
46 }
47 return dataSource;
48 }
49
50 @Override
51 public List<String> getAllShardKeys() {
52 return new ArrayList<>(shardRanges.keySet());
53 }
54
55 // Range query optimization
56 public List<String> getShardsForRange(Long minValue, Long maxValue) {
57 return shardRanges.entrySet().stream()
58 .filter(entry -> {
59 ShardRange range = entry.getValue();
60 // Check if ranges overlap
61 return !(maxValue < range.getMin() || minValue > range.getMax());
62 })
63 .map(Map.Entry::getKey)
64 .collect(Collectors.toList());
65 }
66
67 private HikariDataSource createDataSource(String jdbcUrl) {
68 HikariConfig config = new HikariConfig();
69 config.setJdbcUrl(jdbcUrl);
70 config.setUsername("app_user");
71 config.setPassword("app_password");
72 config.setMaximumPoolSize(20);
73 config.setMinimumIdle(5);
74 config.setConnectionTimeout(30000);
75 config.setIdleTimeout(600000);
76 config.setMaxLifetime(1800000);
77 return new HikariDataSource(config);
78 }
79
80 // Supporting classes
81 public static class ShardRange {
82 private final Long min;
83 private final Long max;
84
85 public ShardRange(Long min, Long max) {
86 this.min = min;
87 this.max = max;
88 }
89
90 public Long getMin() { return min; }
91 public Long getMax() { return max; }
92
93 @Override
94 public String toString() {
95 return String.format("ShardRange{min=%d, max=%s}",
96 min, max == Long.MAX_VALUE ? "∞" : max);
97 }
98 }
99}
100
101// User service with range-based sharding
102@Service
103public class UserService {
104
105 private final RangeBasedShardingStrategy shardingStrategy;
106 private final JdbcTemplate jdbcTemplate;
107
108 public UserService(RangeBasedShardingStrategy shardingStrategy) {
109 this.shardingStrategy = shardingStrategy;
110 this.jdbcTemplate = new JdbcTemplate();
111 }
112
113 public User findById(Long userId) {
114 String shardKey = shardingStrategy.determineShardKey(userId);
115 DataSource dataSource = shardingStrategy.getDataSource(shardKey);
116
117 jdbcTemplate.setDataSource(dataSource);
118
119 return jdbcTemplate.queryForObject(
120 "SELECT * FROM users WHERE id = ?",
121 new Object[]{userId},
122 new UserRowMapper()
123 );
124 }
125
126 public List<User> findByIdRange(Long minId, Long maxId) {
127 List<String> relevantShards = shardingStrategy.getShardsForRange(minId, maxId);
128 List<User> allUsers = new ArrayList<>();
129
130 for (String shardKey : relevantShards) {
131 DataSource dataSource = shardingStrategy.getDataSource(shardKey);
132 jdbcTemplate.setDataSource(dataSource);
133
134 List<User> shardUsers = jdbcTemplate.query(
135 "SELECT * FROM users WHERE id BETWEEN ? AND ?",
136 new Object[]{minId, maxId},
137 new UserRowMapper()
138 );
139
140 allUsers.addAll(shardUsers);
141 }
142
143 return allUsers.stream()
144 .sorted(Comparator.comparing(User::getId))
145 .collect(Collectors.toList());
146 }
147
148 public void save(User user) {
149 String shardKey = shardingStrategy.determineShardKey(user.getId());
150 DataSource dataSource = shardingStrategy.getDataSource(shardKey);
151
152 jdbcTemplate.setDataSource(dataSource);
153
154 jdbcTemplate.update(
155 "INSERT INTO users (id, username, email, created_at) VALUES (?, ?, ?, ?)",
156 user.getId(),
157 user.getUsername(),
158 user.getEmail(),
159 user.getCreatedAt()
160 );
161 }
162
163 // Batch operations across shards
164 public void saveBatch(List<User> users) {
165 Map<String, List<User>> usersByShard = users.stream()
166 .collect(Collectors.groupingBy(user ->
167 shardingStrategy.determineShardKey(user.getId())));
168
169 usersByShard.forEach((shardKey, shardUsers) -> {
170 DataSource dataSource = shardingStrategy.getDataSource(shardKey);
171 jdbcTemplate.setDataSource(dataSource);
172
173 List<Object[]> batchArgs = shardUsers.stream()
174 .map(user -> new Object[]{
175 user.getId(), user.getUsername(),
176 user.getEmail(), user.getCreatedAt()
177 })
178 .collect(Collectors.toList());
179
180 jdbcTemplate.batchUpdate(
181 "INSERT INTO users (id, username, email, created_at) VALUES (?, ?, ?, ?)",
182 batchArgs
183 );
184 });
185 }
186}
✅ Range-Based Sharding Pros and Cons
✅ Advantages:
- Simple to understand and implement
- Efficient for range queries
- Easy to add new shards for growing data
- Sequential data stays together
❌ Disadvantages:
- Hot spots with uneven data distribution
- Difficult to balance load across shards
- Range boundaries may become outdated
- Sequential access patterns can overload single shards
🎯 Use Cases:
- Time-series data (timestamps)
- Sequential IDs or auto-increment keys
- Geographic data distribution
- Log data partitioned by date
2. Hash-Based Sharding
Hash-based sharding uses a hash function to distribute data evenly across shards.
graph TD
A[Shard Key: User ID] --> B[Hash Function<br/>hash(user_id) % num_shards]
B --> C[Result: 0<br/>Shard 1]
B --> D[Result: 1<br/>Shard 2]
B --> E[Result: 2<br/>Shard 3]
B --> F[Result: 3<br/>Shard 4]
C --> G[MySQL Server 1<br/>Hash Result: 0]
D --> H[MySQL Server 2<br/>Hash Result: 1]
E --> I[MySQL Server 3<br/>Hash Result: 2]
F --> J[MySQL Server 4<br/>Hash Result: 3]
style A fill:#ff6b6b
style B fill:#4ecdc4
style G fill:#feca57
style H fill:#feca57
style I fill:#feca57
style J fill:#feca57
🛠️ Hash-Based Implementation
1@Component
2public class HashBasedShardingStrategy implements ShardingStrategy {
3
4 private final List<String> shardKeys;
5 private final Map<String, DataSource> dataSources;
6 private final int numberOfShards;
7
8 public HashBasedShardingStrategy() {
9 this.numberOfShards = 4;
10 this.shardKeys = Arrays.asList("shard0", "shard1", "shard2", "shard3");
11 this.dataSources = new HashMap<>();
12 initializeShards();
13 }
14
15 private void initializeShards() {
16 dataSources.put("shard0", createDataSource("jdbc:mysql://db1:3306/shard0"));
17 dataSources.put("shard1", createDataSource("jdbc:mysql://db2:3306/shard1"));
18 dataSources.put("shard2", createDataSource("jdbc:mysql://db3:3306/shard2"));
19 dataSources.put("shard3", createDataSource("jdbc:mysql://db4:3306/shard3"));
20 }
21
22 @Override
23 public String determineShardKey(Object shardingValue) {
24 int hash = hashFunction(shardingValue);
25 int shardIndex = Math.abs(hash) % numberOfShards;
26 return shardKeys.get(shardIndex);
27 }
28
29 private int hashFunction(Object value) {
30 if (value == null) {
31 return 0;
32 }
33
34 // Use consistent hashing for better distribution
35 return value.hashCode();
36 }
37
38 // Consistent hashing implementation for better resharding
39 public String determineShardKeyConsistent(Object shardingValue) {
40 if (shardingValue == null) {
41 return shardKeys.get(0);
42 }
43
44 // Use MD5 hash for better distribution
45 String input = String.valueOf(shardingValue);
46 try {
47 MessageDigest md = MessageDigest.getInstance("MD5");
48 byte[] hashBytes = md.digest(input.getBytes());
49
50 // Convert to positive integer
51 int hash = 0;
52 for (int i = 0; i < 4; i++) {
53 hash = (hash << 8) + (hashBytes[i] & 0xff);
54 }
55
56 int shardIndex = Math.abs(hash) % numberOfShards;
57 return shardKeys.get(shardIndex);
58
59 } catch (NoSuchAlgorithmException e) {
60 // Fallback to simple hash
61 return determineShardKey(shardingValue);
62 }
63 }
64
65 @Override
66 public DataSource getDataSource(String shardKey) {
67 DataSource dataSource = dataSources.get(shardKey);
68 if (dataSource == null) {
69 throw new IllegalArgumentException("No data source found for shard: " + shardKey);
70 }
71 return dataSource;
72 }
73
74 @Override
75 public List<String> getAllShardKeys() {
76 return new ArrayList<>(shardKeys);
77 }
78
79 // For operations that need to query all shards
80 public <T> List<T> queryAllShards(Function<DataSource, List<T>> queryFunction) {
81 return shardKeys.parallelStream()
82 .map(shardKey -> {
83 DataSource dataSource = dataSources.get(shardKey);
84 return queryFunction.apply(dataSource);
85 })
86 .flatMap(List::stream)
87 .collect(Collectors.toList());
88 }
89
90 // Shard distribution analysis
91 public Map<String, Integer> analyzeDistribution(List<Object> sampleKeys) {
92 return sampleKeys.stream()
93 .collect(Collectors.groupingBy(
94 this::determineShardKey,
95 Collectors.collectingAndThen(
96 Collectors.counting(),
97 Math::toIntExact
98 )
99 ));
100 }
101
102 private HikariDataSource createDataSource(String jdbcUrl) {
103 HikariConfig config = new HikariConfig();
104 config.setJdbcUrl(jdbcUrl);
105 config.setUsername("app_user");
106 config.setPassword("app_password");
107 config.setMaximumPoolSize(20);
108 config.setMinimumIdle(5);
109 config.setConnectionTimeout(30000);
110 config.setIdleTimeout(600000);
111 config.setMaxLifetime(1800000);
112 config.setLeakDetectionThreshold(60000);
113 return new HikariDataSource(config);
114 }
115}
116
117// Product service with hash-based sharding
118@Service
119public class ProductService {
120
121 private final HashBasedShardingStrategy shardingStrategy;
122 private final JdbcTemplate jdbcTemplate;
123
124 public ProductService(HashBasedShardingStrategy shardingStrategy) {
125 this.shardingStrategy = shardingStrategy;
126 this.jdbcTemplate = new JdbcTemplate();
127 }
128
129 public Product findById(String productId) {
130 String shardKey = shardingStrategy.determineShardKey(productId);
131 DataSource dataSource = shardingStrategy.getDataSource(shardKey);
132
133 jdbcTemplate.setDataSource(dataSource);
134
135 try {
136 return jdbcTemplate.queryForObject(
137 "SELECT * FROM products WHERE id = ?",
138 new Object[]{productId},
139 new ProductRowMapper()
140 );
141 } catch (EmptyResultDataAccessException e) {
142 return null;
143 }
144 }
145
146 public List<Product> findByIds(List<String> productIds) {
147 // Group by shard to minimize database connections
148 Map<String, List<String>> idsByShard = productIds.stream()
149 .collect(Collectors.groupingBy(shardingStrategy::determineShardKey));
150
151 List<Product> allProducts = new ArrayList<>();
152
153 idsByShard.forEach((shardKey, ids) -> {
154 DataSource dataSource = shardingStrategy.getDataSource(shardKey);
155 jdbcTemplate.setDataSource(dataSource);
156
157 String inClause = String.join(",", Collections.nCopies(ids.size(), "?"));
158 String query = "SELECT * FROM products WHERE id IN (" + inClause + ")";
159
160 List<Product> shardProducts = jdbcTemplate.query(
161 query,
162 ids.toArray(),
163 new ProductRowMapper()
164 );
165
166 allProducts.addAll(shardProducts);
167 });
168
169 return allProducts;
170 }
171
172 public void save(Product product) {
173 String shardKey = shardingStrategy.determineShardKey(product.getId());
174 DataSource dataSource = shardingStrategy.getDataSource(shardKey);
175
176 jdbcTemplate.setDataSource(dataSource);
177
178 jdbcTemplate.update(
179 "INSERT INTO products (id, name, price, category_id, created_at) VALUES (?, ?, ?, ?, ?)" +
180 "ON DUPLICATE KEY UPDATE name = VALUES(name), price = VALUES(price), " +
181 "category_id = VALUES(category_id), updated_at = NOW()",
182 product.getId(),
183 product.getName(),
184 product.getPrice(),
185 product.getCategoryId(),
186 product.getCreatedAt()
187 );
188 }
189
190 // Global search across all shards
191 public List<Product> searchByName(String namePattern, int limit) {
192 List<Product> allResults = shardingStrategy.queryAllShards(dataSource -> {
193 jdbcTemplate.setDataSource(dataSource);
194 return jdbcTemplate.query(
195 "SELECT * FROM products WHERE name LIKE ? LIMIT ?",
196 new Object[]{"%" + namePattern + "%", limit},
197 new ProductRowMapper()
198 );
199 });
200
201 return allResults.stream()
202 .sorted(Comparator.comparing(Product::getName))
203 .limit(limit)
204 .collect(Collectors.toList());
205 }
206
207 // Analytics query across all shards
208 public Map<String, Object> getProductStatistics() {
209 List<Map<String, Object>> shardStats = shardingStrategy.queryAllShards(dataSource -> {
210 jdbcTemplate.setDataSource(dataSource);
211 List<Map<String, Object>> stats = jdbcTemplate.queryForList(
212 "SELECT " +
213 " COUNT(*) as total_products, " +
214 " AVG(price) as avg_price, " +
215 " MIN(price) as min_price, " +
216 " MAX(price) as max_price " +
217 "FROM products"
218 );
219 return stats;
220 });
221
222 // Aggregate results from all shards
223 long totalProducts = shardStats.stream()
224 .mapToLong(stat -> ((Number) stat.get("total_products")).longValue())
225 .sum();
226
227 double avgPrice = shardStats.stream()
228 .mapToDouble(stat -> ((Number) stat.get("avg_price")).doubleValue())
229 .average()
230 .orElse(0.0);
231
232 double minPrice = shardStats.stream()
233 .mapToDouble(stat -> ((Number) stat.get("min_price")).doubleValue())
234 .min()
235 .orElse(0.0);
236
237 double maxPrice = shardStats.stream()
238 .mapToDouble(stat -> ((Number) stat.get("max_price")).doubleValue())
239 .max()
240 .orElse(0.0);
241
242 Map<String, Object> aggregatedStats = new HashMap<>();
243 aggregatedStats.put("total_products", totalProducts);
244 aggregatedStats.put("avg_price", avgPrice);
245 aggregatedStats.put("min_price", minPrice);
246 aggregatedStats.put("max_price", maxPrice);
247
248 return aggregatedStats;
249 }
250}
✅ Hash-Based Sharding Pros and Cons
✅ Advantages:
- Even data distribution
- Eliminates hot spots
- Simple to implement
- Good performance for point queries
❌ Disadvantages:
- Difficult to perform range queries
- Complex resharding when adding/removing shards
- Cross-shard joins are expensive
- No data locality for related records
🎯 Use Cases:
- User data sharded by user ID
- Product catalogs
- Session data
- Cache-like access patterns
3. Directory-Based Sharding
Directory-based sharding uses a lookup service to determine the shard location.
graph TD
A[Application] --> B[Shard Directory/Router]
B --> C[(Directory Database)]
B --> D[Shard 1<br/>Customer A, D, G]
B --> E[Shard 2<br/>Customer B, E, H]
B --> F[Shard 3<br/>Customer C, F, I]
C --> G[Mapping Table<br/>tenant_id -> shard_id]
style A fill:#ff6b6b
style B fill:#4ecdc4
style C fill:#feca57
style D fill:#45b7d1
style E fill:#45b7d1
style F fill:#45b7d1
🛠️ Directory-Based Implementation
1// Shard directory service
2@Service
3public class ShardDirectoryService {
4
5 private final JdbcTemplate directoryJdbcTemplate;
6 private final Map<String, DataSource> shardDataSources;
7 private final Cache<String, String> shardCache;
8
9 public ShardDirectoryService(DataSource directoryDataSource) {
10 this.directoryJdbcTemplate = new JdbcTemplate(directoryDataSource);
11 this.shardDataSources = new ConcurrentHashMap<>();
12 this.shardCache = Caffeine.newBuilder()
13 .maximumSize(10000)
14 .expireAfterWrite(30, TimeUnit.MINUTES)
15 .build();
16
17 initializeShardDataSources();
18 }
19
20 private void initializeShardDataSources() {
21 // Initialize shard data sources
22 shardDataSources.put("shard-us-east", createDataSource("jdbc:mysql://db-us-east:3306/tenant_data"));
23 shardDataSources.put("shard-us-west", createDataSource("jdbc:mysql://db-us-west:3306/tenant_data"));
24 shardDataSources.put("shard-eu", createDataSource("jdbc:mysql://db-eu:3306/tenant_data"));
25 shardDataSources.put("shard-asia", createDataSource("jdbc:mysql://db-asia:3306/tenant_data"));
26 }
27
28 public String determineShardKey(String tenantId) {
29 // Check cache first
30 String cachedShard = shardCache.getIfPresent(tenantId);
31 if (cachedShard != null) {
32 return cachedShard;
33 }
34
35 // Query directory database
36 try {
37 String shardKey = directoryJdbcTemplate.queryForObject(
38 "SELECT shard_key FROM tenant_shard_mapping WHERE tenant_id = ?",
39 new Object[]{tenantId},
40 String.class
41 );
42
43 // Cache the result
44 shardCache.put(tenantId, shardKey);
45 return shardKey;
46
47 } catch (EmptyResultDataAccessException e) {
48 // Auto-assign to least loaded shard
49 String assignedShard = assignToOptimalShard(tenantId);
50 shardCache.put(tenantId, assignedShard);
51 return assignedShard;
52 }
53 }
54
55 private String assignToOptimalShard(String tenantId) {
56 // Get shard load statistics
57 List<ShardLoadInfo> shardLoads = getShardLoadStatistics();
58
59 // Find least loaded shard
60 String optimalShard = shardLoads.stream()
61 .min(Comparator.comparing(ShardLoadInfo::getLoadScore))
62 .map(ShardLoadInfo::getShardKey)
63 .orElse("shard-us-east"); // Default fallback
64
65 // Register tenant to shard
66 directoryJdbcTemplate.update(
67 "INSERT INTO tenant_shard_mapping (tenant_id, shard_key, assigned_at) VALUES (?, ?, NOW())",
68 tenantId, optimalShard
69 );
70
71 System.out.println("Assigned tenant " + tenantId + " to shard " + optimalShard);
72 return optimalShard;
73 }
74
75 private List<ShardLoadInfo> getShardLoadStatistics() {
76 return directoryJdbcTemplate.query(
77 "SELECT " +
78 " s.shard_key, " +
79 " COUNT(tsm.tenant_id) as tenant_count, " +
80 " s.max_capacity, " +
81 " s.current_connections, " +
82 " s.cpu_usage, " +
83 " s.memory_usage " +
84 "FROM shards s " +
85 "LEFT JOIN tenant_shard_mapping tsm ON s.shard_key = tsm.shard_key " +
86 "GROUP BY s.shard_key",
87 (rs, rowNum) -> new ShardLoadInfo(
88 rs.getString("shard_key"),
89 rs.getInt("tenant_count"),
90 rs.getInt("max_capacity"),
91 rs.getInt("current_connections"),
92 rs.getDouble("cpu_usage"),
93 rs.getDouble("memory_usage")
94 )
95 );
96 }
97
98 public DataSource getDataSource(String shardKey) {
99 DataSource dataSource = shardDataSources.get(shardKey);
100 if (dataSource == null) {
101 throw new IllegalArgumentException("No data source found for shard: " + shardKey);
102 }
103 return dataSource;
104 }
105
106 // Move tenant to different shard (resharding)
107 @Transactional
108 public void moveTenant(String tenantId, String newShardKey) {
109 String currentShardKey = determineShardKey(tenantId);
110
111 if (currentShardKey.equals(newShardKey)) {
112 return; // Already in target shard
113 }
114
115 // 1. Copy data to new shard
116 copyTenantData(tenantId, currentShardKey, newShardKey);
117
118 // 2. Update directory mapping
119 directoryJdbcTemplate.update(
120 "UPDATE tenant_shard_mapping SET shard_key = ?, moved_at = NOW() WHERE tenant_id = ?",
121 newShardKey, tenantId
122 );
123
124 // 3. Clear cache
125 shardCache.invalidate(tenantId);
126
127 // 4. Delete data from old shard (after verification)
128 deleteTenantDataFromShard(tenantId, currentShardKey);
129
130 System.out.println("Moved tenant " + tenantId + " from " + currentShardKey + " to " + newShardKey);
131 }
132
133 private void copyTenantData(String tenantId, String sourceShardKey, String targetShardKey) {
134 DataSource sourceDS = getDataSource(sourceShardKey);
135 DataSource targetDS = getDataSource(targetShardKey);
136
137 JdbcTemplate sourceJdbc = new JdbcTemplate(sourceDS);
138 JdbcTemplate targetJdbc = new JdbcTemplate(targetDS);
139
140 // Copy all tables for the tenant
141 String[] tables = {"orders", "order_items", "customers", "payments"};
142
143 for (String table : tables) {
144 List<Map<String, Object>> data = sourceJdbc.queryForList(
145 "SELECT * FROM " + table + " WHERE tenant_id = ?",
146 tenantId
147 );
148
149 if (!data.isEmpty()) {
150 copyTableData(targetJdbc, table, data);
151 }
152 }
153 }
154
155 private void copyTableData(JdbcTemplate targetJdbc, String tableName, List<Map<String, Object>> data) {
156 if (data.isEmpty()) return;
157
158 // Generate INSERT statement dynamically
159 Map<String, Object> firstRow = data.get(0);
160 String columns = String.join(", ", firstRow.keySet());
161 String placeholders = String.join(", ", Collections.nCopies(firstRow.size(), "?"));
162
163 String insertSql = String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, columns, placeholders);
164
165 List<Object[]> batchArgs = data.stream()
166 .map(row -> firstRow.keySet().stream()
167 .map(row::get)
168 .toArray())
169 .collect(Collectors.toList());
170
171 targetJdbc.batchUpdate(insertSql, batchArgs);
172 }
173
174 private void deleteTenantDataFromShard(String tenantId, String shardKey) {
175 DataSource dataSource = getDataSource(shardKey);
176 JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
177
178 String[] tables = {"order_items", "payments", "orders", "customers"}; // Order matters for FK constraints
179
180 for (String table : tables) {
181 int deletedRows = jdbcTemplate.update(
182 "DELETE FROM " + table + " WHERE tenant_id = ?",
183 tenantId
184 );
185 System.out.println("Deleted " + deletedRows + " rows from " + table + " for tenant " + tenantId);
186 }
187 }
188
189 // Health check for shards
190 public Map<String, Boolean> checkShardHealth() {
191 Map<String, Boolean> healthStatus = new HashMap<>();
192
193 shardDataSources.entrySet().parallelStream().forEach(entry -> {
194 String shardKey = entry.getKey();
195 DataSource dataSource = entry.getValue();
196
197 try {
198 JdbcTemplate jdbc = new JdbcTemplate(dataSource);
199 jdbc.queryForObject("SELECT 1", Integer.class);
200 healthStatus.put(shardKey, true);
201 } catch (Exception e) {
202 healthStatus.put(shardKey, false);
203 System.err.println("Health check failed for shard " + shardKey + ": " + e.getMessage());
204 }
205 });
206
207 return healthStatus;
208 }
209
210 private HikariDataSource createDataSource(String jdbcUrl) {
211 HikariConfig config = new HikariConfig();
212 config.setJdbcUrl(jdbcUrl);
213 config.setUsername("app_user");
214 config.setPassword("app_password");
215 config.setMaximumPoolSize(15);
216 config.setMinimumIdle(3);
217 config.setConnectionTimeout(30000);
218 config.setIdleTimeout(300000);
219 config.setMaxLifetime(1800000);
220 return new HikariDataSource(config);
221 }
222
223 // Supporting classes
224 public static class ShardLoadInfo {
225 private final String shardKey;
226 private final int tenantCount;
227 private final int maxCapacity;
228 private final int currentConnections;
229 private final double cpuUsage;
230 private final double memoryUsage;
231
232 public ShardLoadInfo(String shardKey, int tenantCount, int maxCapacity,
233 int currentConnections, double cpuUsage, double memoryUsage) {
234 this.shardKey = shardKey;
235 this.tenantCount = tenantCount;
236 this.maxCapacity = maxCapacity;
237 this.currentConnections = currentConnections;
238 this.cpuUsage = cpuUsage;
239 this.memoryUsage = memoryUsage;
240 }
241
242 public double getLoadScore() {
243 // Calculate composite load score
244 double capacityRatio = (double) tenantCount / maxCapacity;
245 double connectionRatio = (double) currentConnections / 100; // Assuming max 100 connections
246
247 return (capacityRatio * 0.4) + (connectionRatio * 0.3) + (cpuUsage * 0.2) + (memoryUsage * 0.1);
248 }
249
250 // Getters
251 public String getShardKey() { return shardKey; }
252 public int getTenantCount() { return tenantCount; }
253 public int getMaxCapacity() { return maxCapacity; }
254 public int getCurrentConnections() { return currentConnections; }
255 public double getCpuUsage() { return cpuUsage; }
256 public double getMemoryUsage() { return memoryUsage; }
257 }
258}
259
260// Multi-tenant order service using directory-based sharding
261@Service
262public class OrderService {
263
264 private final ShardDirectoryService shardDirectoryService;
265 private final JdbcTemplate jdbcTemplate;
266
267 public OrderService(ShardDirectoryService shardDirectoryService) {
268 this.shardDirectoryService = shardDirectoryService;
269 this.jdbcTemplate = new JdbcTemplate();
270 }
271
272 public Order findById(String tenantId, Long orderId) {
273 String shardKey = shardDirectoryService.determineShardKey(tenantId);
274 DataSource dataSource = shardDirectoryService.getDataSource(shardKey);
275
276 jdbcTemplate.setDataSource(dataSource);
277
278 try {
279 return jdbcTemplate.queryForObject(
280 "SELECT * FROM orders WHERE tenant_id = ? AND id = ?",
281 new Object[]{tenantId, orderId},
282 new OrderRowMapper()
283 );
284 } catch (EmptyResultDataAccessException e) {
285 return null;
286 }
287 }
288
289 public List<Order> findByTenant(String tenantId, int limit, int offset) {
290 String shardKey = shardDirectoryService.determineShardKey(tenantId);
291 DataSource dataSource = shardDirectoryService.getDataSource(shardKey);
292
293 jdbcTemplate.setDataSource(dataSource);
294
295 return jdbcTemplate.query(
296 "SELECT * FROM orders WHERE tenant_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?",
297 new Object[]{tenantId, limit, offset},
298 new OrderRowMapper()
299 );
300 }
301
302 public void save(Order order) {
303 String shardKey = shardDirectoryService.determineShardKey(order.getTenantId());
304 DataSource dataSource = shardDirectoryService.getDataSource(shardKey);
305
306 jdbcTemplate.setDataSource(dataSource);
307
308 jdbcTemplate.update(
309 "INSERT INTO orders (tenant_id, id, customer_id, total_amount, status, created_at) " +
310 "VALUES (?, ?, ?, ?, ?, ?)",
311 order.getTenantId(),
312 order.getId(),
313 order.getCustomerId(),
314 order.getTotalAmount(),
315 order.getStatus(),
316 order.getCreatedAt()
317 );
318 }
319
320 // Cross-tenant analytics (admin function)
321 @PreAuthorize("hasRole('ADMIN')")
322 public Map<String, Object> getGlobalOrderStatistics() {
323 Map<String, Boolean> shardHealth = shardDirectoryService.checkShardHealth();
324 Map<String, Object> globalStats = new HashMap<>();
325
326 List<Map<String, Object>> allShardStats = shardDirectoryService.shardDataSources.entrySet()
327 .parallelStream()
328 .filter(entry -> shardHealth.get(entry.getKey()))
329 .map(entry -> {
330 DataSource dataSource = entry.getValue();
331 JdbcTemplate jdbc = new JdbcTemplate(dataSource);
332
333 return jdbc.queryForMap(
334 "SELECT " +
335 " COUNT(*) as total_orders, " +
336 " SUM(total_amount) as total_revenue, " +
337 " AVG(total_amount) as avg_order_value, " +
338 " COUNT(DISTINCT tenant_id) as tenant_count " +
339 "FROM orders"
340 );
341 })
342 .collect(Collectors.toList());
343
344 // Aggregate results
345 long totalOrders = allShardStats.stream()
346 .mapToLong(stat -> ((Number) stat.get("total_orders")).longValue())
347 .sum();
348
349 BigDecimal totalRevenue = allShardStats.stream()
350 .map(stat -> (BigDecimal) stat.get("total_revenue"))
351 .filter(Objects::nonNull)
352 .reduce(BigDecimal.ZERO, BigDecimal::add);
353
354 double avgOrderValue = allShardStats.stream()
355 .mapToDouble(stat -> ((Number) stat.get("avg_order_value")).doubleValue())
356 .average()
357 .orElse(0.0);
358
359 long totalTenants = allShardStats.stream()
360 .mapToLong(stat -> ((Number) stat.get("tenant_count")).longValue())
361 .sum();
362
363 globalStats.put("total_orders", totalOrders);
364 globalStats.put("total_revenue", totalRevenue);
365 globalStats.put("avg_order_value", avgOrderValue);
366 globalStats.put("total_tenants", totalTenants);
367 globalStats.put("healthy_shards", shardHealth.values().stream().mapToInt(h -> h ? 1 : 0).sum());
368 globalStats.put("total_shards", shardHealth.size());
369
370 return globalStats;
371 }
372}
✅ Directory-Based Sharding Pros and Cons
✅ Advantages:
- Flexible shard assignment
- Easy to rebalance data
- Support for complex routing logic
- Can optimize based on access patterns
❌ Disadvantages:
- Additional lookup overhead
- Directory service becomes a bottleneck
- More complex architecture
- Potential single point of failure
🎯 Use Cases:
- Multi-tenant applications
- Geographic data distribution
- Load-based shard assignment
- Complex business logic routing
📊 Sharding Strategy Comparison
🎯 Comparison Matrix
Aspect | Range-Based | Hash-Based | Directory-Based |
---|---|---|---|
Data Distribution | Can be uneven | Even distribution | Configurable |
Range Queries | Excellent | Poor | Good |
Point Queries | Good | Excellent | Good |
Resharding | Moderate | Difficult | Easy |
Hot Spots | Possible | Rare | Preventable |
Complexity | Low | Low | High |
Lookup Overhead | None | None | Yes |
Cross-Shard Queries | Limited | All shards | Selective |
📈 Performance Characteristics
graph TD
A[Query Performance] --> B[Point Queries]
A --> C[Range Queries]
A --> D[Cross-Shard Queries]
B --> E[Hash: Excellent<br/>Range: Good<br/>Directory: Good]
C --> F[Range: Excellent<br/>Hash: Poor<br/>Directory: Good]
D --> G[Range: Limited<br/>Hash: All Shards<br/>Directory: Selective]
style B fill:#4ecdc4
style C fill:#feca57
style D fill:#ff6b6b
🛠️ Implementation Best Practices
1. Shard Key Selection
1// Good shard key characteristics
2public class ShardKeyAnalysis {
3
4 // High cardinality - many unique values
5 public void analyzeCardinality(String columnName, DataSource dataSource) {
6 JdbcTemplate jdbc = new JdbcTemplate(dataSource);
7
8 Long totalRows = jdbc.queryForObject("SELECT COUNT(*) FROM " + tableName, Long.class);
9 Long uniqueValues = jdbc.queryForObject("SELECT COUNT(DISTINCT " + columnName + ") FROM " + tableName, Long.class);
10
11 double cardinality = (double) uniqueValues / totalRows;
12
13 if (cardinality > 0.8) {
14 System.out.println(columnName + " has high cardinality: " + cardinality);
15 } else {
16 System.out.println("Warning: " + columnName + " has low cardinality: " + cardinality);
17 }
18 }
19
20 // Even distribution analysis
21 public void analyzeDistribution(List<Object> sampleKeys, ShardingStrategy strategy) {
22 Map<String, Integer> distribution = sampleKeys.stream()
23 .collect(Collectors.groupingBy(
24 key -> strategy.determineShardKey(key),
25 Collectors.collectingAndThen(Collectors.counting(), Math::toIntExact)
26 ));
27
28 double mean = distribution.values().stream().mapToInt(Integer::intValue).average().orElse(0);
29 double variance = distribution.values().stream()
30 .mapToDouble(count -> Math.pow(count - mean, 2))
31 .average()
32 .orElse(0);
33
34 System.out.println("Distribution analysis:");
35 System.out.println("Mean: " + mean);
36 System.out.println("Variance: " + variance);
37 System.out.println("Standard Deviation: " + Math.sqrt(variance));
38
39 distribution.forEach((shard, count) ->
40 System.out.println(shard + ": " + count + " (" + (count/mean)*100 + "% of average)")
41 );
42 }
43}
2. Connection Pool Management
1@Configuration
2public class ShardingDataSourceConfig {
3
4 @Bean
5 public Map<String, DataSource> shardDataSources() {
6 Map<String, DataSource> dataSources = new HashMap<>();
7
8 // Configure each shard with appropriate pool settings
9 List<ShardConfig> shardConfigs = Arrays.asList(
10 new ShardConfig("shard1", "jdbc:mysql://db1:3306/shard1", 20, 5),
11 new ShardConfig("shard2", "jdbc:mysql://db2:3306/shard2", 20, 5),
12 new ShardConfig("shard3", "jdbc:mysql://db3:3306/shard3", 15, 3),
13 new ShardConfig("shard4", "jdbc:mysql://db4:3306/shard4", 25, 8)
14 );
15
16 shardConfigs.forEach(config -> {
17 HikariConfig hikariConfig = new HikariConfig();
18 hikariConfig.setJdbcUrl(config.getJdbcUrl());
19 hikariConfig.setUsername("app_user");
20 hikariConfig.setPassword("app_password");
21 hikariConfig.setMaximumPoolSize(config.getMaxPoolSize());
22 hikariConfig.setMinimumIdle(config.getMinIdle());
23 hikariConfig.setConnectionTimeout(30000);
24 hikariConfig.setIdleTimeout(300000);
25 hikariConfig.setMaxLifetime(1800000);
26 hikariConfig.setLeakDetectionThreshold(60000);
27
28 // Pool name for monitoring
29 hikariConfig.setPoolName(config.getShardKey() + "-pool");
30
31 dataSources.put(config.getShardKey(), new HikariDataSource(hikariConfig));
32 });
33
34 return dataSources;
35 }
36
37 private static class ShardConfig {
38 private final String shardKey;
39 private final String jdbcUrl;
40 private final int maxPoolSize;
41 private final int minIdle;
42
43 public ShardConfig(String shardKey, String jdbcUrl, int maxPoolSize, int minIdle) {
44 this.shardKey = shardKey;
45 this.jdbcUrl = jdbcUrl;
46 this.maxPoolSize = maxPoolSize;
47 this.minIdle = minIdle;
48 }
49
50 // Getters...
51 public String getShardKey() { return shardKey; }
52 public String getJdbcUrl() { return jdbcUrl; }
53 public int getMaxPoolSize() { return maxPoolSize; }
54 public int getMinIdle() { return minIdle; }
55 }
56}
3. Monitoring and Alerting
1@Component
2public class ShardingMonitor {
3
4 private final MeterRegistry meterRegistry;
5 private final Map<String, DataSource> shardDataSources;
6
7 @Scheduled(fixedRate = 60000) // Every minute
8 public void collectShardMetrics() {
9 shardDataSources.forEach((shardKey, dataSource) -> {
10 if (dataSource instanceof HikariDataSource) {
11 HikariDataSource hikariDS = (HikariDataSource) dataSource;
12 HikariPoolMXBean poolBean = hikariDS.getHikariPoolMXBean();
13
14 // Connection pool metrics
15 Gauge.builder("shard.connection.active")
16 .tag("shard", shardKey)
17 .register(meterRegistry, poolBean, HikariPoolMXBean::getActiveConnections);
18
19 Gauge.builder("shard.connection.idle")
20 .tag("shard", shardKey)
21 .register(meterRegistry, poolBean, HikariPoolMXBean::getIdleConnections);
22
23 Gauge.builder("shard.connection.total")
24 .tag("shard", shardKey)
25 .register(meterRegistry, poolBean, HikariPoolMXBean::getTotalConnections);
26
27 // Query performance metrics
28 collectQueryMetrics(shardKey, dataSource);
29 }
30 });
31 }
32
33 private void collectQueryMetrics(String shardKey, DataSource dataSource) {
34 try {
35 JdbcTemplate jdbc = new JdbcTemplate(dataSource);
36
37 // Query response time
38 long startTime = System.nanoTime();
39 jdbc.queryForObject("SELECT 1", Integer.class);
40 long duration = System.nanoTime() - startTime;
41
42 Timer.Sample sample = Timer.start(meterRegistry);
43 sample.stop(Timer.builder("shard.query.response_time")
44 .tag("shard", shardKey)
45 .register(meterRegistry));
46
47 // Table metrics
48 List<Map<String, Object>> tableStats = jdbc.queryForList(
49 "SELECT " +
50 " table_name, " +
51 " table_rows, " +
52 " data_length, " +
53 " index_length " +
54 "FROM information_schema.tables " +
55 "WHERE table_schema = DATABASE()"
56 );
57
58 tableStats.forEach(stat -> {
59 String tableName = (String) stat.get("table_name");
60 Number tableRows = (Number) stat.get("table_rows");
61 Number dataLength = (Number) stat.get("data_length");
62
63 Gauge.builder("shard.table.rows")
64 .tag("shard", shardKey)
65 .tag("table", tableName)
66 .register(meterRegistry, tableRows, Number::longValue);
67
68 Gauge.builder("shard.table.size_bytes")
69 .tag("shard", shardKey)
70 .tag("table", tableName)
71 .register(meterRegistry, dataLength, Number::longValue);
72 });
73
74 } catch (Exception e) {
75 // Log error and increment error counter
76 Counter.builder("shard.health_check.errors")
77 .tag("shard", shardKey)
78 .register(meterRegistry)
79 .increment();
80 }
81 }
82}
🚀 Advanced Considerations
1. Cross-Shard Transactions
1// Distributed transaction coordinator
2@Service
3public class DistributedTransactionManager {
4
5 private final Map<String, DataSource> shardDataSources;
6
7 public void executeDistributedTransaction(List<ShardOperation> operations) {
8 Map<String, Connection> connections = new HashMap<>();
9
10 try {
11 // Phase 1: Prepare all connections
12 for (ShardOperation operation : operations) {
13 String shardKey = operation.getShardKey();
14 DataSource dataSource = shardDataSources.get(shardKey);
15 Connection connection = dataSource.getConnection();
16 connection.setAutoCommit(false);
17 connections.put(shardKey, connection);
18 }
19
20 // Phase 2: Execute operations
21 for (ShardOperation operation : operations) {
22 Connection connection = connections.get(operation.getShardKey());
23 operation.execute(connection);
24 }
25
26 // Phase 3: Commit all transactions
27 for (Connection connection : connections.values()) {
28 connection.commit();
29 }
30
31 } catch (Exception e) {
32 // Rollback all transactions
33 connections.values().forEach(connection -> {
34 try {
35 connection.rollback();
36 } catch (SQLException rollbackException) {
37 System.err.println("Rollback failed: " + rollbackException.getMessage());
38 }
39 });
40 throw new RuntimeException("Distributed transaction failed", e);
41
42 } finally {
43 // Close all connections
44 connections.values().forEach(connection -> {
45 try {
46 connection.close();
47 } catch (SQLException closeException) {
48 System.err.println("Connection close failed: " + closeException.getMessage());
49 }
50 });
51 }
52 }
53
54 public interface ShardOperation {
55 String getShardKey();
56 void execute(Connection connection) throws SQLException;
57 }
58}
2. Data Migration and Resharding
1@Service
2public class ReshardingService {
3
4 public void reshardData(String sourceShardKey, String targetShardKey,
5 ReshardingCriteria criteria) {
6
7 DataSource sourceDS = shardDataSources.get(sourceShardKey);
8 DataSource targetDS = shardDataSources.get(targetShardKey);
9
10 // 1. Identify data to migrate
11 List<Long> recordsToMigrate = identifyRecordsForMigration(sourceDS, criteria);
12
13 // 2. Copy data in batches
14 int batchSize = 1000;
15 for (int i = 0; i < recordsToMigrate.size(); i += batchSize) {
16 List<Long> batch = recordsToMigrate.subList(i,
17 Math.min(i + batchSize, recordsToMigrate.size()));
18
19 migrateDataBatch(sourceDS, targetDS, batch);
20 }
21
22 // 3. Verify data integrity
23 boolean verificationPassed = verifyDataIntegrity(sourceDS, targetDS, recordsToMigrate);
24
25 if (verificationPassed) {
26 // 4. Delete migrated data from source
27 deleteMigratedData(sourceDS, recordsToMigrate);
28 System.out.println("Successfully resharded " + recordsToMigrate.size() + " records");
29 } else {
30 throw new RuntimeException("Data integrity verification failed");
31 }
32 }
33}
🎯 Conclusion
MySQL sharding is a powerful technique for horizontal scaling, but it requires careful planning and implementation. Each sharding strategy has its own trade-offs:
Key Takeaways:
- Range-Based Sharding: Best for time-series and sequential data
- Hash-Based Sharding: Provides even distribution but complicates range queries
- Directory-Based Sharding: Offers flexibility at the cost of complexity
Best Practices:
- Choose the right shard key - High cardinality, even distribution
- Plan for growth - Design resharding strategy from the beginning
- Monitor actively - Track performance, distribution, and health
- Test thoroughly - Especially cross-shard operations and failure scenarios
- Keep it simple - Start with simpler strategies and evolve
Remember that sharding adds significant complexity to your application. Consider alternatives like read replicas, vertical scaling, or managed database solutions before implementing sharding.