๐ฏ Introduction
Building upon our deep dive into Java concurrency fundamentals, this third part explores how classic design patterns can be elegantly implemented using thread interfaces. We’ll examine how Runnable, Callable, and other concurrency primitives can be combined with design patterns to create robust, scalable, and maintainable concurrent systems.
This guide demonstrates practical implementations of essential design patterns in concurrent environments, showing how threading interfaces enhance traditional patterns while addressing the unique challenges of multi-threaded programming.
๐ญ Producer-Consumer Pattern
๐ Pattern Overview
The Producer-Consumer pattern is fundamental in concurrent programming, allowing producers to generate data while consumers process it independently, with proper synchronization and buffering.
graph TD
A[Producer Threads] --> B[Shared Buffer/Queue]
B --> C[Consumer Threads]
D[Producer 1] --> E[BlockingQueue]
F[Producer 2] --> E
G[Producer 3] --> E
E --> H[Consumer 1]
E --> I[Consumer 2]
E --> J[Consumer 3]
K[Synchronization] --> L[put() blocks when full]
K --> M[take() blocks when empty]
K --> N[Thread-safe operations]
style A fill:#ff6b6b
style C fill:#4ecdc4
style E fill:#feca57
๐ ๏ธ Implementation with Thread Interfaces
1. Basic Producer-Consumer with BlockingQueue:
1public class ProducerConsumerPattern {
2
3 private final BlockingQueue<Task> taskQueue;
4 private final AtomicBoolean shutdown = new AtomicBoolean(false);
5 private final List<Thread> producerThreads = new ArrayList<>();
6 private final List<Thread> consumerThreads = new ArrayList<>();
7
8 public ProducerConsumerPattern(int queueCapacity) {
9 this.taskQueue = new ArrayBlockingQueue<>(queueCapacity);
10 }
11
12 // Producer implementation using Runnable
13 public class Producer implements Runnable {
14 private final String producerId;
15 private final TaskGenerator taskGenerator;
16 private final int productionRate;
17
18 public Producer(String producerId, TaskGenerator taskGenerator, int productionRate) {
19 this.producerId = producerId;
20 this.taskGenerator = taskGenerator;
21 this.productionRate = productionRate;
22 }
23
24 @Override
25 public void run() {
26 try {
27 while (!shutdown.get() && !Thread.currentThread().isInterrupted()) {
28 Task task = taskGenerator.generateTask();
29
30 // Blocking put - waits if queue is full
31 boolean offered = taskQueue.offer(task, 1, TimeUnit.SECONDS);
32
33 if (offered) {
34 System.out.println(producerId + " produced: " + task);
35 } else {
36 System.out.println(producerId + " queue full, task rejected: " + task);
37 }
38
39 // Control production rate
40 Thread.sleep(1000 / productionRate);
41 }
42 } catch (InterruptedException e) {
43 Thread.currentThread().interrupt();
44 System.out.println(producerId + " interrupted");
45 }
46 System.out.println(producerId + " shutdown");
47 }
48 }
49
50 // Consumer implementation using Runnable
51 public class Consumer implements Runnable {
52 private final String consumerId;
53 private final TaskProcessor taskProcessor;
54
55 public Consumer(String consumerId, TaskProcessor taskProcessor) {
56 this.consumerId = consumerId;
57 this.taskProcessor = taskProcessor;
58 }
59
60 @Override
61 public void run() {
62 try {
63 while (!shutdown.get() || !taskQueue.isEmpty()) {
64 // Blocking take - waits if queue is empty
65 Task task = taskQueue.poll(1, TimeUnit.SECONDS);
66
67 if (task != null) {
68 System.out.println(consumerId + " consuming: " + task);
69 taskProcessor.processTask(task);
70 System.out.println(consumerId + " completed: " + task);
71 }
72 }
73 } catch (InterruptedException e) {
74 Thread.currentThread().interrupt();
75 System.out.println(consumerId + " interrupted");
76 }
77 System.out.println(consumerId + " shutdown");
78 }
79 }
80
81 // Start the producer-consumer system
82 public void start(int producerCount, int consumerCount) {
83 // Start producers
84 for (int i = 0; i < producerCount; i++) {
85 Producer producer = new Producer(
86 "Producer-" + i,
87 new RandomTaskGenerator(),
88 2 // 2 tasks per second
89 );
90 Thread producerThread = new Thread(producer);
91 producerThreads.add(producerThread);
92 producerThread.start();
93 }
94
95 // Start consumers
96 for (int i = 0; i < consumerCount; i++) {
97 Consumer consumer = new Consumer(
98 "Consumer-" + i,
99 new DefaultTaskProcessor()
100 );
101 Thread consumerThread = new Thread(consumer);
102 consumerThreads.add(consumerThread);
103 consumerThread.start();
104 }
105 }
106
107 public void shutdown() throws InterruptedException {
108 shutdown.set(true);
109
110 // Wait for producers to finish
111 for (Thread producer : producerThreads) {
112 producer.join();
113 }
114
115 // Wait for consumers to finish processing remaining tasks
116 for (Thread consumer : consumerThreads) {
117 consumer.join();
118 }
119
120 System.out.println("All threads shutdown. Remaining tasks: " + taskQueue.size());
121 }
122
123 // Supporting classes
124 public static class Task {
125 private final String id;
126 private final String data;
127 private final long timestamp;
128
129 public Task(String id, String data) {
130 this.id = id;
131 this.data = data;
132 this.timestamp = System.currentTimeMillis();
133 }
134
135 @Override
136 public String toString() {
137 return "Task{id='" + id + "', data='" + data + "'}";
138 }
139
140 // Getters
141 public String getId() { return id; }
142 public String getData() { return data; }
143 public long getTimestamp() { return timestamp; }
144 }
145
146 public interface TaskGenerator {
147 Task generateTask();
148 }
149
150 public interface TaskProcessor {
151 void processTask(Task task) throws InterruptedException;
152 }
153
154 public static class RandomTaskGenerator implements TaskGenerator {
155 private final AtomicInteger counter = new AtomicInteger(0);
156 private final Random random = new Random();
157
158 @Override
159 public Task generateTask() {
160 return new Task(
161 "TASK-" + counter.incrementAndGet(),
162 "Data-" + random.nextInt(1000)
163 );
164 }
165 }
166
167 public static class DefaultTaskProcessor implements TaskProcessor {
168 @Override
169 public void processTask(Task task) throws InterruptedException {
170 // Simulate processing time
171 Thread.sleep(100 + (long)(Math.random() * 200));
172 }
173 }
174}
2. Advanced Producer-Consumer with Callable Results:
1public class AsyncProducerConsumerPattern {
2
3 private final BlockingQueue<Future<ProcessingResult>> resultQueue;
4 private final ExecutorService producerExecutor;
5 private final ExecutorService consumerExecutor;
6 private final CompletionService<ProcessingResult> completionService;
7
8 public AsyncProducerConsumerPattern(int poolSize) {
9 this.resultQueue = new LinkedBlockingQueue<>();
10 this.producerExecutor = Executors.newFixedThreadPool(poolSize);
11 this.consumerExecutor = Executors.newFixedThreadPool(poolSize);
12 this.completionService = new ExecutorCompletionService<>(consumerExecutor);
13 }
14
15 // Producer that submits Callable tasks
16 public class AsyncProducer implements Runnable {
17 private final String producerId;
18 private final List<ProcessingTask> tasksToProcess;
19
20 public AsyncProducer(String producerId, List<ProcessingTask> tasksToProcess) {
21 this.producerId = producerId;
22 this.tasksToProcess = tasksToProcess;
23 }
24
25 @Override
26 public void run() {
27 for (ProcessingTask task : tasksToProcess) {
28 // Submit Callable task to completion service
29 Future<ProcessingResult> future = completionService.submit(new ProcessingCallable(task));
30
31 try {
32 resultQueue.put(future);
33 System.out.println(producerId + " submitted task: " + task.getId());
34 } catch (InterruptedException e) {
35 Thread.currentThread().interrupt();
36 break;
37 }
38 }
39 }
40 }
41
42 // Consumer that processes Future results
43 public class AsyncConsumer implements Runnable {
44 private final String consumerId;
45 private final int expectedResults;
46
47 public AsyncConsumer(String consumerId, int expectedResults) {
48 this.consumerId = consumerId;
49 this.expectedResults = expectedResults;
50 }
51
52 @Override
53 public void run() {
54 int processedCount = 0;
55
56 while (processedCount < expectedResults) {
57 try {
58 // Get completed future from completion service
59 Future<ProcessingResult> completedFuture = completionService.take();
60 ProcessingResult result = completedFuture.get();
61
62 System.out.println(consumerId + " processed result: " + result);
63 processedCount++;
64
65 } catch (InterruptedException e) {
66 Thread.currentThread().interrupt();
67 break;
68 } catch (ExecutionException e) {
69 System.err.println(consumerId + " execution error: " + e.getCause());
70 }
71 }
72
73 System.out.println(consumerId + " completed processing " + processedCount + " results");
74 }
75 }
76
77 // Callable implementation for processing tasks
78 public static class ProcessingCallable implements Callable<ProcessingResult> {
79 private final ProcessingTask task;
80
81 public ProcessingCallable(ProcessingTask task) {
82 this.task = task;
83 }
84
85 @Override
86 public ProcessingResult call() throws Exception {
87 // Simulate complex processing
88 Thread.sleep((long)(Math.random() * 1000) + 500);
89
90 if (Math.random() < 0.1) { // 10% failure rate
91 throw new ProcessingException("Processing failed for task: " + task.getId());
92 }
93
94 return new ProcessingResult(
95 task.getId(),
96 "Processed: " + task.getData(),
97 System.currentTimeMillis()
98 );
99 }
100 }
101
102 // Supporting classes
103 public static class ProcessingTask {
104 private final String id;
105 private final String data;
106
107 public ProcessingTask(String id, String data) {
108 this.id = id;
109 this.data = data;
110 }
111
112 public String getId() { return id; }
113 public String getData() { return data; }
114
115 @Override
116 public String toString() {
117 return "ProcessingTask{id='" + id + "', data='" + data + "'}";
118 }
119 }
120
121 public static class ProcessingResult {
122 private final String taskId;
123 private final String result;
124 private final long completedAt;
125
126 public ProcessingResult(String taskId, String result, long completedAt) {
127 this.taskId = taskId;
128 this.result = result;
129 this.completedAt = completedAt;
130 }
131
132 @Override
133 public String toString() {
134 return "ProcessingResult{taskId='" + taskId + "', result='" + result + "'}";
135 }
136 }
137
138 public static class ProcessingException extends Exception {
139 public ProcessingException(String message) {
140 super(message);
141 }
142 }
143
144 public void shutdown() {
145 producerExecutor.shutdown();
146 consumerExecutor.shutdown();
147 }
148}
โ Pros and Cons
โ Advantages:
- Decoupling: Producers and consumers operate independently
- Scalability: Easy to add more producers or consumers
- Buffering: Queue provides buffer for rate differences
- Fault Tolerance: Failure in one component doesn’t affect others
โ Disadvantages:
- Memory Usage: Queue can consume significant memory
- Complexity: Requires careful thread management
- Backpressure: Need to handle queue overflow scenarios
- Latency: Queuing introduces processing delays
๐ฏ Use Cases:
- Web server request processing
- Log processing systems
- Message queue implementations
- Data streaming applications
๐๏ธ Observer Pattern
๐ Pattern Overview
The Observer pattern allows objects to notify multiple observers about state changes, particularly useful in concurrent environments for event-driven architectures.
graph TD
A[Subject/Observable] --> B[Observer 1]
A --> C[Observer 2]
A --> D[Observer 3]
A --> E[Observer N]
F[State Change] --> G[notifyObservers()]
G --> H[Concurrent Notification]
H --> I[Observer Thread 1]
H --> J[Observer Thread 2]
H --> K[Observer Thread 3]
style A fill:#ff6b6b
style H fill:#4ecdc4
style F fill:#feca57
๐ ๏ธ Implementation with Thread Interfaces
1. Concurrent Observer with Runnable:
1public class ConcurrentObserverPattern<T> {
2
3 private final List<Observer<T>> observers = new CopyOnWriteArrayList<>();
4 private final ExecutorService notificationExecutor;
5 private final AtomicReference<T> currentState = new AtomicReference<>();
6
7 public ConcurrentObserverPattern(int threadPoolSize) {
8 this.notificationExecutor = Executors.newFixedThreadPool(threadPoolSize);
9 }
10
11 // Observer interface
12 public interface Observer<T> {
13 void onStateChanged(T newState, T oldState);
14 String getObserverId();
15 }
16
17 // Observable subject methods
18 public void addObserver(Observer<T> observer) {
19 observers.add(observer);
20 System.out.println("Added observer: " + observer.getObserverId());
21 }
22
23 public void removeObserver(Observer<T> observer) {
24 observers.remove(observer);
25 System.out.println("Removed observer: " + observer.getObserverId());
26 }
27
28 public void setState(T newState) {
29 T oldState = currentState.getAndSet(newState);
30 notifyObservers(newState, oldState);
31 }
32
33 public T getState() {
34 return currentState.get();
35 }
36
37 // Concurrent notification using Runnable
38 private void notifyObservers(T newState, T oldState) {
39 for (Observer<T> observer : observers) {
40 // Each observer notification runs in its own thread
41 notificationExecutor.submit(new ObserverNotificationTask(observer, newState, oldState));
42 }
43 }
44
45 // Runnable implementation for observer notifications
46 private class ObserverNotificationTask implements Runnable {
47 private final Observer<T> observer;
48 private final T newState;
49 private final T oldState;
50
51 public ObserverNotificationTask(Observer<T> observer, T newState, T oldState) {
52 this.observer = observer;
53 this.newState = newState;
54 this.oldState = oldState;
55 }
56
57 @Override
58 public void run() {
59 try {
60 long startTime = System.nanoTime();
61 observer.onStateChanged(newState, oldState);
62 long duration = System.nanoTime() - startTime;
63
64 System.out.println("Observer " + observer.getObserverId() +
65 " notified in " + duration / 1_000_000 + "ms");
66
67 } catch (Exception e) {
68 System.err.println("Error notifying observer " + observer.getObserverId() + ": " + e.getMessage());
69 }
70 }
71 }
72
73 // Example observer implementations
74 public static class LoggingObserver<T> implements Observer<T> {
75 private final String id;
76
77 public LoggingObserver(String id) {
78 this.id = id;
79 }
80
81 @Override
82 public void onStateChanged(T newState, T oldState) {
83 // Simulate processing time
84 try {
85 Thread.sleep(50);
86 } catch (InterruptedException e) {
87 Thread.currentThread().interrupt();
88 }
89
90 System.out.println("[" + id + "] State changed from " + oldState + " to " + newState);
91 }
92
93 @Override
94 public String getObserverId() {
95 return id;
96 }
97 }
98
99 public static class MetricsObserver<T> implements Observer<T> {
100 private final String id;
101 private final AtomicLong changeCount = new AtomicLong(0);
102
103 public MetricsObserver(String id) {
104 this.id = id;
105 }
106
107 @Override
108 public void onStateChanged(T newState, T oldState) {
109 long count = changeCount.incrementAndGet();
110 System.out.println("[" + id + "] Recorded state change #" + count);
111 }
112
113 @Override
114 public String getObserverId() {
115 return id;
116 }
117
118 public long getChangeCount() {
119 return changeCount.get();
120 }
121 }
122
123 public void shutdown() {
124 notificationExecutor.shutdown();
125 try {
126 if (!notificationExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
127 notificationExecutor.shutdownNow();
128 }
129 } catch (InterruptedException e) {
130 notificationExecutor.shutdownNow();
131 }
132 }
133}
2. Event-Driven Observer with Callable:
1public class EventDrivenObserverPattern {
2
3 private final Map<Class<?>, List<EventHandler<?>>> handlerMap = new ConcurrentHashMap<>();
4 private final ExecutorService eventProcessorExecutor;
5 private final CompletionService<EventProcessingResult> completionService;
6
7 public EventDrivenObserverPattern(int threadPoolSize) {
8 this.eventProcessorExecutor = Executors.newFixedThreadPool(threadPoolSize);
9 this.completionService = new ExecutorCompletionService<>(eventProcessorExecutor);
10 }
11
12 // Event handler interface
13 public interface EventHandler<E> {
14 EventProcessingResult handleEvent(E event) throws Exception;
15 String getHandlerId();
16 }
17
18 // Register event handlers
19 @SuppressWarnings("unchecked")
20 public <E> void registerHandler(Class<E> eventType, EventHandler<E> handler) {
21 handlerMap.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>()).add(handler);
22 System.out.println("Registered handler " + handler.getHandlerId() + " for event type " + eventType.getSimpleName());
23 }
24
25 // Publish event and return futures for all processing results
26 @SuppressWarnings("unchecked")
27 public <E> List<Future<EventProcessingResult>> publishEvent(E event) {
28 List<Future<EventProcessingResult>> futures = new ArrayList<>();
29 List<EventHandler<?>> handlers = handlerMap.get(event.getClass());
30
31 if (handlers != null) {
32 for (EventHandler<?> handler : handlers) {
33 EventHandler<E> typedHandler = (EventHandler<E>) handler;
34 Future<EventProcessingResult> future = completionService.submit(
35 new EventProcessingCallable<>(event, typedHandler)
36 );
37 futures.add(future);
38 }
39 }
40
41 return futures;
42 }
43
44 // Callable implementation for event processing
45 private static class EventProcessingCallable<E> implements Callable<EventProcessingResult> {
46 private final E event;
47 private final EventHandler<E> handler;
48
49 public EventProcessingCallable(E event, EventHandler<E> handler) {
50 this.event = event;
51 this.handler = handler;
52 }
53
54 @Override
55 public EventProcessingResult call() throws Exception {
56 long startTime = System.currentTimeMillis();
57
58 try {
59 EventProcessingResult result = handler.handleEvent(event);
60 long duration = System.currentTimeMillis() - startTime;
61
62 return new EventProcessingResult(
63 handler.getHandlerId(),
64 event.getClass().getSimpleName(),
65 true,
66 duration,
67 result.getMessage()
68 );
69
70 } catch (Exception e) {
71 long duration = System.currentTimeMillis() - startTime;
72
73 return new EventProcessingResult(
74 handler.getHandlerId(),
75 event.getClass().getSimpleName(),
76 false,
77 duration,
78 "Error: " + e.getMessage()
79 );
80 }
81 }
82 }
83
84 // Event processing result
85 public static class EventProcessingResult {
86 private final String handlerId;
87 private final String eventType;
88 private final boolean success;
89 private final long processingTimeMs;
90 private final String message;
91
92 public EventProcessingResult(String handlerId, String eventType, boolean success,
93 long processingTimeMs, String message) {
94 this.handlerId = handlerId;
95 this.eventType = eventType;
96 this.success = success;
97 this.processingTimeMs = processingTimeMs;
98 this.message = message;
99 }
100
101 @Override
102 public String toString() {
103 return String.format("EventProcessingResult{handler='%s', event='%s', success=%s, time=%dms, message='%s'}",
104 handlerId, eventType, success, processingTimeMs, message);
105 }
106
107 // Getters
108 public String getHandlerId() { return handlerId; }
109 public String getEventType() { return eventType; }
110 public boolean isSuccess() { return success; }
111 public long getProcessingTimeMs() { return processingTimeMs; }
112 public String getMessage() { return message; }
113 }
114
115 // Example events and handlers
116 public static class UserRegisteredEvent {
117 private final String userId;
118 private final String email;
119 private final long timestamp;
120
121 public UserRegisteredEvent(String userId, String email) {
122 this.userId = userId;
123 this.email = email;
124 this.timestamp = System.currentTimeMillis();
125 }
126
127 public String getUserId() { return userId; }
128 public String getEmail() { return email; }
129 public long getTimestamp() { return timestamp; }
130 }
131
132 public static class EmailNotificationHandler implements EventHandler<UserRegisteredEvent> {
133 private final String handlerId;
134
135 public EmailNotificationHandler(String handlerId) {
136 this.handlerId = handlerId;
137 }
138
139 @Override
140 public EventProcessingResult handleEvent(UserRegisteredEvent event) throws Exception {
141 // Simulate email sending
142 Thread.sleep(200);
143
144 if (Math.random() < 0.9) { // 90% success rate
145 return new EventProcessingResult(
146 handlerId,
147 "UserRegisteredEvent",
148 true,
149 200,
150 "Welcome email sent to " + event.getEmail()
151 );
152 } else {
153 throw new Exception("Email service unavailable");
154 }
155 }
156
157 @Override
158 public String getHandlerId() {
159 return handlerId;
160 }
161 }
162
163 public static class AuditLogHandler implements EventHandler<UserRegisteredEvent> {
164 private final String handlerId;
165
166 public AuditLogHandler(String handlerId) {
167 this.handlerId = handlerId;
168 }
169
170 @Override
171 public EventProcessingResult handleEvent(UserRegisteredEvent event) throws Exception {
172 // Simulate audit logging
173 Thread.sleep(50);
174
175 return new EventProcessingResult(
176 handlerId,
177 "UserRegisteredEvent",
178 true,
179 50,
180 "Audit log created for user " + event.getUserId()
181 );
182 }
183
184 @Override
185 public String getHandlerId() {
186 return handlerId;
187 }
188 }
189
190 public void shutdown() {
191 eventProcessorExecutor.shutdown();
192 try {
193 if (!eventProcessorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
194 eventProcessorExecutor.shutdownNow();
195 }
196 } catch (InterruptedException e) {
197 eventProcessorExecutor.shutdownNow();
198 }
199 }
200}
โ Pros and Cons
โ Advantages:
- Loose Coupling: Subjects don’t need to know about specific observers
- Dynamic Relationships: Can add/remove observers at runtime
- Concurrent Processing: Multiple observers can process events simultaneously
- Scalability: Easy to add new event types and handlers
โ Disadvantages:
- Performance Overhead: Thread creation and context switching
- Memory Leaks: Observers may not be properly cleaned up
- Ordering Issues: No guarantee of notification order
- Error Handling: Failed observers can impact system performance
๐ฏ Use Cases:
- UI event handling systems
- Microservice event broadcasting
- Real-time monitoring systems
- Publish-subscribe message patterns
โก Command Pattern
๐ Pattern Overview
The Command pattern encapsulates requests as objects, allowing for queuing, logging, and undo operations. In concurrent environments, it’s excellent for task scheduling and execution.
graph TD
A[Command Interface] --> B[ConcreteCommand1]
A --> C[ConcreteCommand2]
A --> D[ConcreteCommand3]
E[Invoker/Executor] --> F[CommandQueue]
F --> G[ExecutorService]
G --> H[Worker Thread 1]
G --> I[Worker Thread 2]
G --> J[Worker Thread 3]
H --> K[execute()]
I --> K
J --> K
style A fill:#ff6b6b
style G fill:#4ecdc4
style K fill:#feca57
๐ ๏ธ Implementation with Thread Interfaces
1. Concurrent Command Executor:
1public class ConcurrentCommandPattern {
2
3 private final ExecutorService commandExecutor;
4 private final BlockingQueue<CommandExecution> executionQueue;
5 private final Map<String, CommandResult> results = new ConcurrentHashMap<>();
6 private final AtomicBoolean shutdown = new AtomicBoolean(false);
7
8 public ConcurrentCommandPattern(int threadPoolSize, int queueCapacity) {
9 this.commandExecutor = Executors.newFixedThreadPool(threadPoolSize);
10 this.executionQueue = new ArrayBlockingQueue<>(queueCapacity);
11
12 // Start result collector thread
13 Thread resultCollector = new Thread(new ResultCollector());
14 resultCollector.setDaemon(true);
15 resultCollector.start();
16 }
17
18 // Command interface
19 public interface Command {
20 CommandResult execute() throws Exception;
21 String getCommandId();
22 String getDescription();
23 boolean isUndoable();
24 void undo() throws Exception;
25 }
26
27 // Command result wrapper
28 public static class CommandResult {
29 private final String commandId;
30 private final boolean success;
31 private final Object result;
32 private final Exception exception;
33 private final long executionTimeMs;
34
35 public CommandResult(String commandId, boolean success, Object result,
36 Exception exception, long executionTimeMs) {
37 this.commandId = commandId;
38 this.success = success;
39 this.result = result;
40 this.exception = exception;
41 this.executionTimeMs = executionTimeMs;
42 }
43
44 // Factory methods
45 public static CommandResult success(String commandId, Object result, long executionTimeMs) {
46 return new CommandResult(commandId, true, result, null, executionTimeMs);
47 }
48
49 public static CommandResult failure(String commandId, Exception exception, long executionTimeMs) {
50 return new CommandResult(commandId, false, null, exception, executionTimeMs);
51 }
52
53 // Getters
54 public String getCommandId() { return commandId; }
55 public boolean isSuccess() { return success; }
56 public Object getResult() { return result; }
57 public Exception getException() { return exception; }
58 public long getExecutionTimeMs() { return executionTimeMs; }
59
60 @Override
61 public String toString() {
62 return String.format("CommandResult{id='%s', success=%s, time=%dms, result=%s}",
63 commandId, success, executionTimeMs,
64 success ? result : (exception != null ? exception.getMessage() : "null"));
65 }
66 }
67
68 // Command execution wrapper
69 private static class CommandExecution {
70 private final Command command;
71 private final CompletableFuture<CommandResult> future;
72
73 public CommandExecution(Command command, CompletableFuture<CommandResult> future) {
74 this.command = command;
75 this.future = future;
76 }
77
78 public Command getCommand() { return command; }
79 public CompletableFuture<CommandResult> getFuture() { return future; }
80 }
81
82 // Execute command asynchronously
83 public CompletableFuture<CommandResult> executeAsync(Command command) {
84 CompletableFuture<CommandResult> future = new CompletableFuture<>();
85
86 commandExecutor.submit(new CommandExecutionTask(command, future));
87
88 return future;
89 }
90
91 // Execute command synchronously with timeout
92 public CommandResult executeSync(Command command, long timeoutMs)
93 throws InterruptedException, ExecutionException, TimeoutException {
94 CompletableFuture<CommandResult> future = executeAsync(command);
95 return future.get(timeoutMs, TimeUnit.MILLISECONDS);
96 }
97
98 // Command execution Runnable
99 private class CommandExecutionTask implements Runnable {
100 private final Command command;
101 private final CompletableFuture<CommandResult> future;
102
103 public CommandExecutionTask(Command command, CompletableFuture<CommandResult> future) {
104 this.command = command;
105 this.future = future;
106 }
107
108 @Override
109 public void run() {
110 long startTime = System.currentTimeMillis();
111
112 try {
113 System.out.println("Executing command: " + command.getDescription());
114 CommandResult result = command.execute();
115
116 long duration = System.currentTimeMillis() - startTime;
117 CommandResult timedResult = new CommandResult(
118 result.getCommandId(),
119 result.isSuccess(),
120 result.getResult(),
121 result.getException(),
122 duration
123 );
124
125 future.complete(timedResult);
126
127 // Add to execution queue for result collection
128 try {
129 executionQueue.offer(new CommandExecution(command, CompletableFuture.completedFuture(timedResult)),
130 1, TimeUnit.SECONDS);
131 } catch (InterruptedException e) {
132 Thread.currentThread().interrupt();
133 }
134
135 } catch (Exception e) {
136 long duration = System.currentTimeMillis() - startTime;
137 CommandResult errorResult = CommandResult.failure(command.getCommandId(), e, duration);
138 future.complete(errorResult);
139 }
140 }
141 }
142
143 // Result collector Runnable
144 private class ResultCollector implements Runnable {
145 @Override
146 public void run() {
147 while (!shutdown.get()) {
148 try {
149 CommandExecution execution = executionQueue.poll(1, TimeUnit.SECONDS);
150 if (execution != null) {
151 CommandResult result = execution.getFuture().get();
152 results.put(result.getCommandId(), result);
153
154 System.out.println("Collected result: " + result);
155 }
156 } catch (InterruptedException e) {
157 Thread.currentThread().interrupt();
158 break;
159 } catch (ExecutionException e) {
160 System.err.println("Error collecting result: " + e.getCause());
161 }
162 }
163 }
164 }
165
166 // Get command result
167 public CommandResult getResult(String commandId) {
168 return results.get(commandId);
169 }
170
171 // Get all results
172 public Map<String, CommandResult> getAllResults() {
173 return new HashMap<>(results);
174 }
175
176 // Concrete command implementations
177 public static class FileProcessingCommand implements Command {
178 private final String commandId;
179 private final String filePath;
180 private final String operation;
181
182 public FileProcessingCommand(String commandId, String filePath, String operation) {
183 this.commandId = commandId;
184 this.filePath = filePath;
185 this.operation = operation;
186 }
187
188 @Override
189 public CommandResult execute() throws Exception {
190 // Simulate file processing
191 Thread.sleep((long)(Math.random() * 1000) + 500);
192
193 if (Math.random() < 0.1) { // 10% failure rate
194 throw new Exception("File processing failed for: " + filePath);
195 }
196
197 String result = String.format("File %s processed with operation: %s", filePath, operation);
198 return CommandResult.success(commandId, result, 0);
199 }
200
201 @Override
202 public String getCommandId() {
203 return commandId;
204 }
205
206 @Override
207 public String getDescription() {
208 return String.format("FileProcessing[%s]: %s on %s", commandId, operation, filePath);
209 }
210
211 @Override
212 public boolean isUndoable() {
213 return "DELETE".equalsIgnoreCase(operation);
214 }
215
216 @Override
217 public void undo() throws Exception {
218 if (isUndoable()) {
219 System.out.println("Undoing delete operation for: " + filePath);
220 // Simulate restore operation
221 Thread.sleep(200);
222 } else {
223 throw new UnsupportedOperationException("Cannot undo operation: " + operation);
224 }
225 }
226 }
227
228 public static class DatabaseCommand implements Command {
229 private final String commandId;
230 private final String sql;
231 private final String operation;
232
233 public DatabaseCommand(String commandId, String sql, String operation) {
234 this.commandId = commandId;
235 this.sql = sql;
236 this.operation = operation;
237 }
238
239 @Override
240 public CommandResult execute() throws Exception {
241 // Simulate database operation
242 Thread.sleep((long)(Math.random() * 500) + 100);
243
244 if (sql.contains("DROP") && Math.random() < 0.3) {
245 throw new Exception("Database operation failed: " + sql);
246 }
247
248 String result = String.format("Database %s executed: %s", operation, sql);
249 return CommandResult.success(commandId, result, 0);
250 }
251
252 @Override
253 public String getCommandId() {
254 return commandId;
255 }
256
257 @Override
258 public String getDescription() {
259 return String.format("Database[%s]: %s", commandId, sql);
260 }
261
262 @Override
263 public boolean isUndoable() {
264 return operation.equalsIgnoreCase("INSERT") || operation.equalsIgnoreCase("UPDATE");
265 }
266
267 @Override
268 public void undo() throws Exception {
269 if (isUndoable()) {
270 System.out.println("Undoing database operation: " + sql);
271 Thread.sleep(150);
272 } else {
273 throw new UnsupportedOperationException("Cannot undo database operation: " + operation);
274 }
275 }
276 }
277
278 public void shutdown() {
279 shutdown.set(true);
280 commandExecutor.shutdown();
281
282 try {
283 if (!commandExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
284 commandExecutor.shutdownNow();
285 }
286 } catch (InterruptedException e) {
287 commandExecutor.shutdownNow();
288 }
289 }
290}
โ Pros and Cons
โ Advantages:
- Decoupling: Invoker doesn’t need to know command implementation details
- Queuing: Commands can be queued, scheduled, and batch processed
- Undo Support: Easy to implement undo/redo functionality
- Logging: Command execution can be logged and audited
- Macro Commands: Can combine multiple commands into composite operations
โ Disadvantages:
- Complexity: Adds layers of abstraction
- Memory Usage: Each command is an object consuming memory
- Performance: Object creation overhead for simple operations
- Thread Safety: Commands must be thread-safe if shared
๐ฏ Use Cases:
- Task scheduling systems
- Undo/redo functionality
- Batch processing frameworks
- Request queuing in web applications
- Database transaction management
๐๏ธ Strategy Pattern
๐ Pattern Overview
The Strategy pattern defines a family of algorithms, encapsulates each one, and makes them interchangeable. In concurrent programming, different strategies can be executed in parallel.
๐ ๏ธ Implementation with Thread Interfaces
1. Concurrent Strategy Execution:
1public class ConcurrentStrategyPattern {
2
3 private final ExecutorService strategyExecutor;
4 private final CompletionService<StrategyResult> completionService;
5
6 public ConcurrentStrategyPattern(int threadPoolSize) {
7 this.strategyExecutor = Executors.newFixedThreadPool(threadPoolSize);
8 this.completionService = new ExecutorCompletionService<>(strategyExecutor);
9 }
10
11 // Strategy interface
12 public interface Strategy<T, R> {
13 R execute(T input) throws Exception;
14 String getStrategyName();
15 int getPriority(); // Higher number = higher priority
16 long getEstimatedExecutionTime();
17 }
18
19 // Strategy execution context
20 public static class StrategyContext<T> {
21 private final T input;
22 private final List<Strategy<T, ?>> strategies;
23 private final ExecutionMode mode;
24
25 public enum ExecutionMode {
26 PARALLEL, // Execute all strategies in parallel
27 RACE, // Return first successful result
28 FALLBACK, // Try strategies in priority order until one succeeds
29 BEST_EFFORT // Execute all and return best result based on criteria
30 }
31
32 public StrategyContext(T input, List<Strategy<T, ?>> strategies, ExecutionMode mode) {
33 this.input = input;
34 this.strategies = new ArrayList<>(strategies);
35 this.mode = mode;
36 }
37
38 public T getInput() { return input; }
39 public List<Strategy<T, ?>> getStrategies() { return strategies; }
40 public ExecutionMode getMode() { return mode; }
41 }
42
43 // Strategy execution result
44 public static class StrategyResult {
45 private final String strategyName;
46 private final Object result;
47 private final boolean success;
48 private final Exception exception;
49 private final long executionTimeMs;
50
51 public StrategyResult(String strategyName, Object result, boolean success,
52 Exception exception, long executionTimeMs) {
53 this.strategyName = strategyName;
54 this.result = result;
55 this.success = success;
56 this.exception = exception;
57 this.executionTimeMs = executionTimeMs;
58 }
59
60 public static StrategyResult success(String strategyName, Object result, long executionTimeMs) {
61 return new StrategyResult(strategyName, result, true, null, executionTimeMs);
62 }
63
64 public static StrategyResult failure(String strategyName, Exception exception, long executionTimeMs) {
65 return new StrategyResult(strategyName, null, false, exception, executionTimeMs);
66 }
67
68 // Getters
69 public String getStrategyName() { return strategyName; }
70 public Object getResult() { return result; }
71 public boolean isSuccess() { return success; }
72 public Exception getException() { return exception; }
73 public long getExecutionTimeMs() { return executionTimeMs; }
74
75 @Override
76 public String toString() {
77 return String.format("StrategyResult{strategy='%s', success=%s, time=%dms, result=%s}",
78 strategyName, success, executionTimeMs,
79 success ? result : (exception != null ? exception.getMessage() : "null"));
80 }
81 }
82
83 // Execute strategies based on context
84 public <T> CompletableFuture<List<StrategyResult>> executeStrategies(StrategyContext<T> context) {
85 switch (context.getMode()) {
86 case PARALLEL:
87 return executeParallel(context);
88 case RACE:
89 return executeRace(context);
90 case FALLBACK:
91 return executeFallback(context);
92 case BEST_EFFORT:
93 return executeBestEffort(context);
94 default:
95 throw new IllegalArgumentException("Unsupported execution mode: " + context.getMode());
96 }
97 }
98
99 // Execute all strategies in parallel
100 @SuppressWarnings("unchecked")
101 private <T> CompletableFuture<List<StrategyResult>> executeParallel(StrategyContext<T> context) {
102 List<CompletableFuture<StrategyResult>> futures = new ArrayList<>();
103
104 for (Strategy<T, ?> strategy : context.getStrategies()) {
105 CompletableFuture<StrategyResult> future = CompletableFuture.supplyAsync(() -> {
106 return executeStrategy((Strategy<T, Object>) strategy, context.getInput());
107 }, strategyExecutor);
108
109 futures.add(future);
110 }
111
112 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
113 .thenApply(v -> futures.stream()
114 .map(CompletableFuture::join)
115 .collect(Collectors.toList()));
116 }
117
118 // Execute strategies in race mode (first successful result wins)
119 @SuppressWarnings("unchecked")
120 private <T> CompletableFuture<List<StrategyResult>> executeRace(StrategyContext<T> context) {
121 CompletableFuture<StrategyResult> raceFuture = new CompletableFuture<>();
122 AtomicInteger completedCount = new AtomicInteger(0);
123 List<StrategyResult> allResults = new CopyOnWriteArrayList<>();
124
125 for (Strategy<T, ?> strategy : context.getStrategies()) {
126 CompletableFuture.supplyAsync(() -> {
127 return executeStrategy((Strategy<T, Object>) strategy, context.getInput());
128 }, strategyExecutor)
129 .whenComplete((result, throwable) -> {
130 allResults.add(result);
131
132 // Complete race future with first successful result
133 if (result.isSuccess() && !raceFuture.isDone()) {
134 raceFuture.complete(result);
135 }
136
137 // If all strategies completed and none succeeded, complete with last result
138 if (completedCount.incrementAndGet() == context.getStrategies().size() && !raceFuture.isDone()) {
139 raceFuture.complete(result);
140 }
141 });
142 }
143
144 return raceFuture.thenApply(winningResult -> {
145 System.out.println("Race winner: " + winningResult.getStrategyName());
146 return allResults;
147 });
148 }
149
150 // Execute strategies in fallback mode (priority order until success)
151 @SuppressWarnings("unchecked")
152 private <T> CompletableFuture<List<StrategyResult>> executeFallback(StrategyContext<T> context) {
153 List<Strategy<T, ?>> sortedStrategies = context.getStrategies().stream()
154 .sorted((s1, s2) -> Integer.compare(s2.getPriority(), s1.getPriority()))
155 .collect(Collectors.toList());
156
157 return CompletableFuture.supplyAsync(() -> {
158 List<StrategyResult> results = new ArrayList<>();
159
160 for (Strategy<T, ?> strategy : sortedStrategies) {
161 StrategyResult result = executeStrategy((Strategy<T, Object>) strategy, context.getInput());
162 results.add(result);
163
164 if (result.isSuccess()) {
165 System.out.println("Fallback success with strategy: " + strategy.getStrategyName());
166 break;
167 }
168 }
169
170 return results;
171 }, strategyExecutor);
172 }
173
174 // Execute strategies and return best result based on execution time and success
175 @SuppressWarnings("unchecked")
176 private <T> CompletableFuture<List<StrategyResult>> executeBestEffort(StrategyContext<T> context) {
177 return executeParallel(context).thenApply(results -> {
178 StrategyResult bestResult = results.stream()
179 .filter(StrategyResult::isSuccess)
180 .min(Comparator.comparing(StrategyResult::getExecutionTimeMs))
181 .orElse(results.get(0));
182
183 System.out.println("Best effort result: " + bestResult.getStrategyName() +
184 " (time: " + bestResult.getExecutionTimeMs() + "ms)");
185
186 return results;
187 });
188 }
189
190 // Execute single strategy
191 private <T> StrategyResult executeStrategy(Strategy<T, Object> strategy, T input) {
192 long startTime = System.currentTimeMillis();
193
194 try {
195 Object result = strategy.execute(input);
196 long duration = System.currentTimeMillis() - startTime;
197
198 return StrategyResult.success(strategy.getStrategyName(), result, duration);
199
200 } catch (Exception e) {
201 long duration = System.currentTimeMillis() - startTime;
202 return StrategyResult.failure(strategy.getStrategyName(), e, duration);
203 }
204 }
205
206 // Example strategy implementations for data processing
207 public static class FastProcessingStrategy implements Strategy<String, String> {
208 @Override
209 public String execute(String input) throws Exception {
210 Thread.sleep(100); // Fast processing
211 return "Fast result: " + input.toUpperCase();
212 }
213
214 @Override
215 public String getStrategyName() {
216 return "FastProcessing";
217 }
218
219 @Override
220 public int getPriority() {
221 return 3;
222 }
223
224 @Override
225 public long getEstimatedExecutionTime() {
226 return 100;
227 }
228 }
229
230 public static class AccurateProcessingStrategy implements Strategy<String, String> {
231 @Override
232 public String execute(String input) throws Exception {
233 Thread.sleep(500); // Slower but more accurate
234
235 if (Math.random() < 0.1) {
236 throw new Exception("Processing failed");
237 }
238
239 return "Accurate result: " + input.toLowerCase().replace(" ", "_");
240 }
241
242 @Override
243 public String getStrategyName() {
244 return "AccurateProcessing";
245 }
246
247 @Override
248 public int getPriority() {
249 return 2;
250 }
251
252 @Override
253 public long getEstimatedExecutionTime() {
254 return 500;
255 }
256 }
257
258 public static class FallbackProcessingStrategy implements Strategy<String, String> {
259 @Override
260 public String execute(String input) throws Exception {
261 Thread.sleep(50); // Very fast fallback
262 return "Fallback result: " + input;
263 }
264
265 @Override
266 public String getStrategyName() {
267 return "FallbackProcessing";
268 }
269
270 @Override
271 public int getPriority() {
272 return 1; // Lowest priority
273 }
274
275 @Override
276 public long getEstimatedExecutionTime() {
277 return 50;
278 }
279 }
280
281 public void shutdown() {
282 strategyExecutor.shutdown();
283 try {
284 if (!strategyExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
285 strategyExecutor.shutdownNow();
286 }
287 } catch (InterruptedException e) {
288 strategyExecutor.shutdownNow();
289 }
290 }
291}
โ Pros and Cons
โ Advantages:
- Flexibility: Easy to add new strategies without modifying existing code
- Performance: Can execute strategies in parallel for faster results
- Resilience: Fallback strategies provide fault tolerance
- Optimization: Can choose best strategy based on runtime conditions
โ Disadvantages:
- Complexity: Multiple execution modes add complexity
- Resource Usage: Parallel execution consumes more resources
- Coordination Overhead: Managing multiple strategy executions
- Decision Logic: Choosing the right execution mode can be complex
๐ฏ Use Cases:
- Algorithm selection based on data characteristics
- Load balancing across different service endpoints
- Fault-tolerant processing with fallback options
- Performance optimization through parallel execution
๐ข Enterprise Patterns
๐ Thread Pool Manager Pattern
1public class EnterpriseThreadPoolManager {
2
3 private final Map<String, ExecutorService> threadPools = new ConcurrentHashMap<>();
4 private final Map<String, ThreadPoolMetrics> metrics = new ConcurrentHashMap<>();
5 private final ScheduledExecutorService monitoringExecutor;
6
7 public EnterpriseThreadPoolManager() {
8 this.monitoringExecutor = Executors.newScheduledThreadPool(1);
9 startMonitoring();
10 }
11
12 // Thread pool configuration
13 public static class ThreadPoolConfig {
14 private final String poolName;
15 private final int corePoolSize;
16 private final int maximumPoolSize;
17 private final long keepAliveTime;
18 private final TimeUnit timeUnit;
19 private final int queueCapacity;
20 private final boolean allowCoreThreadTimeOut;
21
22 public ThreadPoolConfig(String poolName, int corePoolSize, int maximumPoolSize,
23 long keepAliveTime, TimeUnit timeUnit, int queueCapacity,
24 boolean allowCoreThreadTimeOut) {
25 this.poolName = poolName;
26 this.corePoolSize = corePoolSize;
27 this.maximumPoolSize = maximumPoolSize;
28 this.keepAliveTime = keepAliveTime;
29 this.timeUnit = timeUnit;
30 this.queueCapacity = queueCapacity;
31 this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
32 }
33
34 // Getters
35 public String getPoolName() { return poolName; }
36 public int getCorePoolSize() { return corePoolSize; }
37 public int getMaximumPoolSize() { return maximumPoolSize; }
38 public long getKeepAliveTime() { return keepAliveTime; }
39 public TimeUnit getTimeUnit() { return timeUnit; }
40 public int getQueueCapacity() { return queueCapacity; }
41 public boolean isAllowCoreThreadTimeOut() { return allowCoreThreadTimeOut; }
42 }
43
44 // Create thread pool with configuration
45 public void createThreadPool(ThreadPoolConfig config) {
46 ThreadPoolExecutor executor = new ThreadPoolExecutor(
47 config.getCorePoolSize(),
48 config.getMaximumPoolSize(),
49 config.getKeepAliveTime(),
50 config.getTimeUnit(),
51 new ArrayBlockingQueue<>(config.getQueueCapacity()),
52 new NamedThreadFactory(config.getPoolName()),
53 new ThreadPoolExecutor.CallerRunsPolicy()
54 );
55
56 executor.allowCoreThreadTimeOut(config.isAllowCoreThreadTimeOut());
57
58 threadPools.put(config.getPoolName(), executor);
59 metrics.put(config.getPoolName(), new ThreadPoolMetrics(config.getPoolName()));
60
61 System.out.println("Created thread pool: " + config.getPoolName());
62 }
63
64 // Get thread pool
65 public ExecutorService getThreadPool(String poolName) {
66 return threadPools.get(poolName);
67 }
68
69 // Submit task to specific pool
70 public <T> Future<T> submitTask(String poolName, Callable<T> task) {
71 ExecutorService executor = threadPools.get(poolName);
72 if (executor == null) {
73 throw new IllegalArgumentException("Thread pool not found: " + poolName);
74 }
75
76 ThreadPoolMetrics poolMetrics = metrics.get(poolName);
77 poolMetrics.incrementSubmitted();
78
79 return executor.submit(() -> {
80 long startTime = System.currentTimeMillis();
81 try {
82 T result = task.call();
83 long duration = System.currentTimeMillis() - startTime;
84 poolMetrics.recordExecution(duration, true);
85 return result;
86 } catch (Exception e) {
87 long duration = System.currentTimeMillis() - startTime;
88 poolMetrics.recordExecution(duration, false);
89 throw new RuntimeException(e);
90 }
91 });
92 }
93
94 // Thread pool metrics
95 private static class ThreadPoolMetrics {
96 private final String poolName;
97 private final AtomicLong submittedTasks = new AtomicLong(0);
98 private final AtomicLong completedTasks = new AtomicLong(0);
99 private final AtomicLong failedTasks = new AtomicLong(0);
100 private final AtomicLong totalExecutionTime = new AtomicLong(0);
101
102 public ThreadPoolMetrics(String poolName) {
103 this.poolName = poolName;
104 }
105
106 public void incrementSubmitted() {
107 submittedTasks.incrementAndGet();
108 }
109
110 public void recordExecution(long durationMs, boolean success) {
111 completedTasks.incrementAndGet();
112 totalExecutionTime.addAndGet(durationMs);
113
114 if (!success) {
115 failedTasks.incrementAndGet();
116 }
117 }
118
119 public String getPoolName() { return poolName; }
120 public long getSubmittedTasks() { return submittedTasks.get(); }
121 public long getCompletedTasks() { return completedTasks.get(); }
122 public long getFailedTasks() { return failedTasks.get(); }
123 public double getAverageExecutionTime() {
124 long completed = completedTasks.get();
125 return completed > 0 ? (double) totalExecutionTime.get() / completed : 0.0;
126 }
127
128 @Override
129 public String toString() {
130 return String.format("ThreadPoolMetrics{pool='%s', submitted=%d, completed=%d, failed=%d, avgTime=%.2fms}",
131 poolName, submittedTasks.get(), completedTasks.get(), failedTasks.get(), getAverageExecutionTime());
132 }
133 }
134
135 // Named thread factory
136 private static class NamedThreadFactory implements ThreadFactory {
137 private final String namePrefix;
138 private final AtomicInteger threadNumber = new AtomicInteger(1);
139
140 public NamedThreadFactory(String namePrefix) {
141 this.namePrefix = namePrefix;
142 }
143
144 @Override
145 public Thread newThread(Runnable r) {
146 Thread thread = new Thread(r, namePrefix + "-thread-" + threadNumber.getAndIncrement());
147 thread.setDaemon(false);
148 return thread;
149 }
150 }
151
152 // Monitoring
153 private void startMonitoring() {
154 monitoringExecutor.scheduleAtFixedRate(() -> {
155 for (Map.Entry<String, ExecutorService> entry : threadPools.entrySet()) {
156 String poolName = entry.getKey();
157 ExecutorService executor = entry.getValue();
158
159 if (executor instanceof ThreadPoolExecutor) {
160 ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
161 ThreadPoolMetrics poolMetrics = metrics.get(poolName);
162
163 System.out.printf("Pool: %s | Active: %d | Queue: %d | Completed: %d | %s%n",
164 poolName,
165 tpe.getActiveCount(),
166 tpe.getQueue().size(),
167 tpe.getCompletedTaskCount(),
168 poolMetrics.toString()
169 );
170 }
171 }
172 System.out.println("---");
173 }, 5, 5, TimeUnit.SECONDS);
174 }
175
176 public void shutdown() {
177 for (ExecutorService executor : threadPools.values()) {
178 executor.shutdown();
179 }
180 monitoringExecutor.shutdown();
181 }
182}
๐ Pattern Comparison Summary
๐ฏ When to Use Each Pattern
graph TD
A[Concurrency Pattern Selection] --> B{Data Flow Type?}
B -->|Producer-Consumer| C[Producer-Consumer Pattern]
B -->|Event Driven| D[Observer Pattern]
B -->|Task Based| E[Command Pattern]
B -->|Algorithm Choice| F[Strategy Pattern]
C --> G[High throughput data processing]
C --> H[Message queue systems]
D --> I[Event broadcasting]
D --> J[State change notifications]
E --> K[Task scheduling]
E --> L[Undo/redo operations]
F --> M[Algorithm selection]
F --> N[Fault tolerance]
style C fill:#ff6b6b
style D fill:#4ecdc4
style E fill:#feca57
style F fill:#45b7d1
Pattern | Best For | Thread Interface | Complexity | Performance |
---|---|---|---|---|
Producer-Consumer | Data streaming, Queue processing | Runnable, BlockingQueue | Medium | High throughput |
Observer | Event systems, State changes | Runnable, CompletableFuture | Medium | Medium latency |
Command | Task scheduling, Undo operations | Callable, ExecutorService | High | Variable |
Strategy | Algorithm selection, Fault tolerance | Callable, CompletableFuture | High | Optimized |
Thread Pool Manager | Enterprise resource management | All interfaces | High | Highly optimized |
๐ฏ Conclusion
Design patterns combined with Java thread interfaces provide powerful abstractions for building robust concurrent systems. Each pattern serves specific use cases and offers unique advantages in concurrent environments.
Key principles for success:
- Choose the Right Pattern: Match pattern to your specific concurrency needs
- Leverage Thread Interfaces: Use Runnable for fire-and-forget, Callable for results
- Plan for Failures: Implement proper error handling and fallback strategies
- Monitor Performance: Track metrics and optimize based on actual usage
- Test Thoroughly: Concurrent patterns require extensive testing under load
Master these patterns and you’ll be equipped to handle complex concurrent programming challenges in enterprise Java applications.