Java Concurrency Part 3: Design Patterns with Thread Interfaces - Producer-Consumer, Observer, and Enterprise Patterns

๐ŸŽฏ Introduction

Building upon our deep dive into Java concurrency fundamentals, this third part explores how classic design patterns can be elegantly implemented using thread interfaces. We’ll examine how Runnable, Callable, and other concurrency primitives can be combined with design patterns to create robust, scalable, and maintainable concurrent systems.

This guide demonstrates practical implementations of essential design patterns in concurrent environments, showing how threading interfaces enhance traditional patterns while addressing the unique challenges of multi-threaded programming.

๐Ÿญ Producer-Consumer Pattern

๐Ÿ“‹ Pattern Overview

The Producer-Consumer pattern is fundamental in concurrent programming, allowing producers to generate data while consumers process it independently, with proper synchronization and buffering.

graph TD
    A[Producer Threads] --> B[Shared Buffer/Queue]
    B --> C[Consumer Threads]

    D[Producer 1] --> E[BlockingQueue]
    F[Producer 2] --> E
    G[Producer 3] --> E

    E --> H[Consumer 1]
    E --> I[Consumer 2]
    E --> J[Consumer 3]

    K[Synchronization] --> L[put() blocks when full]
    K --> M[take() blocks when empty]
    K --> N[Thread-safe operations]

    style A fill:#ff6b6b
    style C fill:#4ecdc4
    style E fill:#feca57

๐Ÿ› ๏ธ Implementation with Thread Interfaces

1. Basic Producer-Consumer with BlockingQueue:

  1public class ProducerConsumerPattern {
  2
  3    private final BlockingQueue<Task> taskQueue;
  4    private final AtomicBoolean shutdown = new AtomicBoolean(false);
  5    private final List<Thread> producerThreads = new ArrayList<>();
  6    private final List<Thread> consumerThreads = new ArrayList<>();
  7
  8    public ProducerConsumerPattern(int queueCapacity) {
  9        this.taskQueue = new ArrayBlockingQueue<>(queueCapacity);
 10    }
 11
 12    // Producer implementation using Runnable
 13    public class Producer implements Runnable {
 14        private final String producerId;
 15        private final TaskGenerator taskGenerator;
 16        private final int productionRate;
 17
 18        public Producer(String producerId, TaskGenerator taskGenerator, int productionRate) {
 19            this.producerId = producerId;
 20            this.taskGenerator = taskGenerator;
 21            this.productionRate = productionRate;
 22        }
 23
 24        @Override
 25        public void run() {
 26            try {
 27                while (!shutdown.get() && !Thread.currentThread().isInterrupted()) {
 28                    Task task = taskGenerator.generateTask();
 29
 30                    // Blocking put - waits if queue is full
 31                    boolean offered = taskQueue.offer(task, 1, TimeUnit.SECONDS);
 32
 33                    if (offered) {
 34                        System.out.println(producerId + " produced: " + task);
 35                    } else {
 36                        System.out.println(producerId + " queue full, task rejected: " + task);
 37                    }
 38
 39                    // Control production rate
 40                    Thread.sleep(1000 / productionRate);
 41                }
 42            } catch (InterruptedException e) {
 43                Thread.currentThread().interrupt();
 44                System.out.println(producerId + " interrupted");
 45            }
 46            System.out.println(producerId + " shutdown");
 47        }
 48    }
 49
 50    // Consumer implementation using Runnable
 51    public class Consumer implements Runnable {
 52        private final String consumerId;
 53        private final TaskProcessor taskProcessor;
 54
 55        public Consumer(String consumerId, TaskProcessor taskProcessor) {
 56            this.consumerId = consumerId;
 57            this.taskProcessor = taskProcessor;
 58        }
 59
 60        @Override
 61        public void run() {
 62            try {
 63                while (!shutdown.get() || !taskQueue.isEmpty()) {
 64                    // Blocking take - waits if queue is empty
 65                    Task task = taskQueue.poll(1, TimeUnit.SECONDS);
 66
 67                    if (task != null) {
 68                        System.out.println(consumerId + " consuming: " + task);
 69                        taskProcessor.processTask(task);
 70                        System.out.println(consumerId + " completed: " + task);
 71                    }
 72                }
 73            } catch (InterruptedException e) {
 74                Thread.currentThread().interrupt();
 75                System.out.println(consumerId + " interrupted");
 76            }
 77            System.out.println(consumerId + " shutdown");
 78        }
 79    }
 80
 81    // Start the producer-consumer system
 82    public void start(int producerCount, int consumerCount) {
 83        // Start producers
 84        for (int i = 0; i < producerCount; i++) {
 85            Producer producer = new Producer(
 86                "Producer-" + i,
 87                new RandomTaskGenerator(),
 88                2 // 2 tasks per second
 89            );
 90            Thread producerThread = new Thread(producer);
 91            producerThreads.add(producerThread);
 92            producerThread.start();
 93        }
 94
 95        // Start consumers
 96        for (int i = 0; i < consumerCount; i++) {
 97            Consumer consumer = new Consumer(
 98                "Consumer-" + i,
 99                new DefaultTaskProcessor()
100            );
101            Thread consumerThread = new Thread(consumer);
102            consumerThreads.add(consumerThread);
103            consumerThread.start();
104        }
105    }
106
107    public void shutdown() throws InterruptedException {
108        shutdown.set(true);
109
110        // Wait for producers to finish
111        for (Thread producer : producerThreads) {
112            producer.join();
113        }
114
115        // Wait for consumers to finish processing remaining tasks
116        for (Thread consumer : consumerThreads) {
117            consumer.join();
118        }
119
120        System.out.println("All threads shutdown. Remaining tasks: " + taskQueue.size());
121    }
122
123    // Supporting classes
124    public static class Task {
125        private final String id;
126        private final String data;
127        private final long timestamp;
128
129        public Task(String id, String data) {
130            this.id = id;
131            this.data = data;
132            this.timestamp = System.currentTimeMillis();
133        }
134
135        @Override
136        public String toString() {
137            return "Task{id='" + id + "', data='" + data + "'}";
138        }
139
140        // Getters
141        public String getId() { return id; }
142        public String getData() { return data; }
143        public long getTimestamp() { return timestamp; }
144    }
145
146    public interface TaskGenerator {
147        Task generateTask();
148    }
149
150    public interface TaskProcessor {
151        void processTask(Task task) throws InterruptedException;
152    }
153
154    public static class RandomTaskGenerator implements TaskGenerator {
155        private final AtomicInteger counter = new AtomicInteger(0);
156        private final Random random = new Random();
157
158        @Override
159        public Task generateTask() {
160            return new Task(
161                "TASK-" + counter.incrementAndGet(),
162                "Data-" + random.nextInt(1000)
163            );
164        }
165    }
166
167    public static class DefaultTaskProcessor implements TaskProcessor {
168        @Override
169        public void processTask(Task task) throws InterruptedException {
170            // Simulate processing time
171            Thread.sleep(100 + (long)(Math.random() * 200));
172        }
173    }
174}

2. Advanced Producer-Consumer with Callable Results:

  1public class AsyncProducerConsumerPattern {
  2
  3    private final BlockingQueue<Future<ProcessingResult>> resultQueue;
  4    private final ExecutorService producerExecutor;
  5    private final ExecutorService consumerExecutor;
  6    private final CompletionService<ProcessingResult> completionService;
  7
  8    public AsyncProducerConsumerPattern(int poolSize) {
  9        this.resultQueue = new LinkedBlockingQueue<>();
 10        this.producerExecutor = Executors.newFixedThreadPool(poolSize);
 11        this.consumerExecutor = Executors.newFixedThreadPool(poolSize);
 12        this.completionService = new ExecutorCompletionService<>(consumerExecutor);
 13    }
 14
 15    // Producer that submits Callable tasks
 16    public class AsyncProducer implements Runnable {
 17        private final String producerId;
 18        private final List<ProcessingTask> tasksToProcess;
 19
 20        public AsyncProducer(String producerId, List<ProcessingTask> tasksToProcess) {
 21            this.producerId = producerId;
 22            this.tasksToProcess = tasksToProcess;
 23        }
 24
 25        @Override
 26        public void run() {
 27            for (ProcessingTask task : tasksToProcess) {
 28                // Submit Callable task to completion service
 29                Future<ProcessingResult> future = completionService.submit(new ProcessingCallable(task));
 30
 31                try {
 32                    resultQueue.put(future);
 33                    System.out.println(producerId + " submitted task: " + task.getId());
 34                } catch (InterruptedException e) {
 35                    Thread.currentThread().interrupt();
 36                    break;
 37                }
 38            }
 39        }
 40    }
 41
 42    // Consumer that processes Future results
 43    public class AsyncConsumer implements Runnable {
 44        private final String consumerId;
 45        private final int expectedResults;
 46
 47        public AsyncConsumer(String consumerId, int expectedResults) {
 48            this.consumerId = consumerId;
 49            this.expectedResults = expectedResults;
 50        }
 51
 52        @Override
 53        public void run() {
 54            int processedCount = 0;
 55
 56            while (processedCount < expectedResults) {
 57                try {
 58                    // Get completed future from completion service
 59                    Future<ProcessingResult> completedFuture = completionService.take();
 60                    ProcessingResult result = completedFuture.get();
 61
 62                    System.out.println(consumerId + " processed result: " + result);
 63                    processedCount++;
 64
 65                } catch (InterruptedException e) {
 66                    Thread.currentThread().interrupt();
 67                    break;
 68                } catch (ExecutionException e) {
 69                    System.err.println(consumerId + " execution error: " + e.getCause());
 70                }
 71            }
 72
 73            System.out.println(consumerId + " completed processing " + processedCount + " results");
 74        }
 75    }
 76
 77    // Callable implementation for processing tasks
 78    public static class ProcessingCallable implements Callable<ProcessingResult> {
 79        private final ProcessingTask task;
 80
 81        public ProcessingCallable(ProcessingTask task) {
 82            this.task = task;
 83        }
 84
 85        @Override
 86        public ProcessingResult call() throws Exception {
 87            // Simulate complex processing
 88            Thread.sleep((long)(Math.random() * 1000) + 500);
 89
 90            if (Math.random() < 0.1) { // 10% failure rate
 91                throw new ProcessingException("Processing failed for task: " + task.getId());
 92            }
 93
 94            return new ProcessingResult(
 95                task.getId(),
 96                "Processed: " + task.getData(),
 97                System.currentTimeMillis()
 98            );
 99        }
100    }
101
102    // Supporting classes
103    public static class ProcessingTask {
104        private final String id;
105        private final String data;
106
107        public ProcessingTask(String id, String data) {
108            this.id = id;
109            this.data = data;
110        }
111
112        public String getId() { return id; }
113        public String getData() { return data; }
114
115        @Override
116        public String toString() {
117            return "ProcessingTask{id='" + id + "', data='" + data + "'}";
118        }
119    }
120
121    public static class ProcessingResult {
122        private final String taskId;
123        private final String result;
124        private final long completedAt;
125
126        public ProcessingResult(String taskId, String result, long completedAt) {
127            this.taskId = taskId;
128            this.result = result;
129            this.completedAt = completedAt;
130        }
131
132        @Override
133        public String toString() {
134            return "ProcessingResult{taskId='" + taskId + "', result='" + result + "'}";
135        }
136    }
137
138    public static class ProcessingException extends Exception {
139        public ProcessingException(String message) {
140            super(message);
141        }
142    }
143
144    public void shutdown() {
145        producerExecutor.shutdown();
146        consumerExecutor.shutdown();
147    }
148}

โœ… Pros and Cons

โœ… Advantages:

  • Decoupling: Producers and consumers operate independently
  • Scalability: Easy to add more producers or consumers
  • Buffering: Queue provides buffer for rate differences
  • Fault Tolerance: Failure in one component doesn’t affect others

โŒ Disadvantages:

  • Memory Usage: Queue can consume significant memory
  • Complexity: Requires careful thread management
  • Backpressure: Need to handle queue overflow scenarios
  • Latency: Queuing introduces processing delays

๐ŸŽฏ Use Cases:

  • Web server request processing
  • Log processing systems
  • Message queue implementations
  • Data streaming applications

๐Ÿ‘๏ธ Observer Pattern

๐Ÿ“‹ Pattern Overview

The Observer pattern allows objects to notify multiple observers about state changes, particularly useful in concurrent environments for event-driven architectures.

graph TD
    A[Subject/Observable] --> B[Observer 1]
    A --> C[Observer 2]
    A --> D[Observer 3]
    A --> E[Observer N]

    F[State Change] --> G[notifyObservers()]
    G --> H[Concurrent Notification]

    H --> I[Observer Thread 1]
    H --> J[Observer Thread 2]
    H --> K[Observer Thread 3]

    style A fill:#ff6b6b
    style H fill:#4ecdc4
    style F fill:#feca57

๐Ÿ› ๏ธ Implementation with Thread Interfaces

1. Concurrent Observer with Runnable:

  1public class ConcurrentObserverPattern<T> {
  2
  3    private final List<Observer<T>> observers = new CopyOnWriteArrayList<>();
  4    private final ExecutorService notificationExecutor;
  5    private final AtomicReference<T> currentState = new AtomicReference<>();
  6
  7    public ConcurrentObserverPattern(int threadPoolSize) {
  8        this.notificationExecutor = Executors.newFixedThreadPool(threadPoolSize);
  9    }
 10
 11    // Observer interface
 12    public interface Observer<T> {
 13        void onStateChanged(T newState, T oldState);
 14        String getObserverId();
 15    }
 16
 17    // Observable subject methods
 18    public void addObserver(Observer<T> observer) {
 19        observers.add(observer);
 20        System.out.println("Added observer: " + observer.getObserverId());
 21    }
 22
 23    public void removeObserver(Observer<T> observer) {
 24        observers.remove(observer);
 25        System.out.println("Removed observer: " + observer.getObserverId());
 26    }
 27
 28    public void setState(T newState) {
 29        T oldState = currentState.getAndSet(newState);
 30        notifyObservers(newState, oldState);
 31    }
 32
 33    public T getState() {
 34        return currentState.get();
 35    }
 36
 37    // Concurrent notification using Runnable
 38    private void notifyObservers(T newState, T oldState) {
 39        for (Observer<T> observer : observers) {
 40            // Each observer notification runs in its own thread
 41            notificationExecutor.submit(new ObserverNotificationTask(observer, newState, oldState));
 42        }
 43    }
 44
 45    // Runnable implementation for observer notifications
 46    private class ObserverNotificationTask implements Runnable {
 47        private final Observer<T> observer;
 48        private final T newState;
 49        private final T oldState;
 50
 51        public ObserverNotificationTask(Observer<T> observer, T newState, T oldState) {
 52            this.observer = observer;
 53            this.newState = newState;
 54            this.oldState = oldState;
 55        }
 56
 57        @Override
 58        public void run() {
 59            try {
 60                long startTime = System.nanoTime();
 61                observer.onStateChanged(newState, oldState);
 62                long duration = System.nanoTime() - startTime;
 63
 64                System.out.println("Observer " + observer.getObserverId() +
 65                    " notified in " + duration / 1_000_000 + "ms");
 66
 67            } catch (Exception e) {
 68                System.err.println("Error notifying observer " + observer.getObserverId() + ": " + e.getMessage());
 69            }
 70        }
 71    }
 72
 73    // Example observer implementations
 74    public static class LoggingObserver<T> implements Observer<T> {
 75        private final String id;
 76
 77        public LoggingObserver(String id) {
 78            this.id = id;
 79        }
 80
 81        @Override
 82        public void onStateChanged(T newState, T oldState) {
 83            // Simulate processing time
 84            try {
 85                Thread.sleep(50);
 86            } catch (InterruptedException e) {
 87                Thread.currentThread().interrupt();
 88            }
 89
 90            System.out.println("[" + id + "] State changed from " + oldState + " to " + newState);
 91        }
 92
 93        @Override
 94        public String getObserverId() {
 95            return id;
 96        }
 97    }
 98
 99    public static class MetricsObserver<T> implements Observer<T> {
100        private final String id;
101        private final AtomicLong changeCount = new AtomicLong(0);
102
103        public MetricsObserver(String id) {
104            this.id = id;
105        }
106
107        @Override
108        public void onStateChanged(T newState, T oldState) {
109            long count = changeCount.incrementAndGet();
110            System.out.println("[" + id + "] Recorded state change #" + count);
111        }
112
113        @Override
114        public String getObserverId() {
115            return id;
116        }
117
118        public long getChangeCount() {
119            return changeCount.get();
120        }
121    }
122
123    public void shutdown() {
124        notificationExecutor.shutdown();
125        try {
126            if (!notificationExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
127                notificationExecutor.shutdownNow();
128            }
129        } catch (InterruptedException e) {
130            notificationExecutor.shutdownNow();
131        }
132    }
133}

2. Event-Driven Observer with Callable:

  1public class EventDrivenObserverPattern {
  2
  3    private final Map<Class<?>, List<EventHandler<?>>> handlerMap = new ConcurrentHashMap<>();
  4    private final ExecutorService eventProcessorExecutor;
  5    private final CompletionService<EventProcessingResult> completionService;
  6
  7    public EventDrivenObserverPattern(int threadPoolSize) {
  8        this.eventProcessorExecutor = Executors.newFixedThreadPool(threadPoolSize);
  9        this.completionService = new ExecutorCompletionService<>(eventProcessorExecutor);
 10    }
 11
 12    // Event handler interface
 13    public interface EventHandler<E> {
 14        EventProcessingResult handleEvent(E event) throws Exception;
 15        String getHandlerId();
 16    }
 17
 18    // Register event handlers
 19    @SuppressWarnings("unchecked")
 20    public <E> void registerHandler(Class<E> eventType, EventHandler<E> handler) {
 21        handlerMap.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>()).add(handler);
 22        System.out.println("Registered handler " + handler.getHandlerId() + " for event type " + eventType.getSimpleName());
 23    }
 24
 25    // Publish event and return futures for all processing results
 26    @SuppressWarnings("unchecked")
 27    public <E> List<Future<EventProcessingResult>> publishEvent(E event) {
 28        List<Future<EventProcessingResult>> futures = new ArrayList<>();
 29        List<EventHandler<?>> handlers = handlerMap.get(event.getClass());
 30
 31        if (handlers != null) {
 32            for (EventHandler<?> handler : handlers) {
 33                EventHandler<E> typedHandler = (EventHandler<E>) handler;
 34                Future<EventProcessingResult> future = completionService.submit(
 35                    new EventProcessingCallable<>(event, typedHandler)
 36                );
 37                futures.add(future);
 38            }
 39        }
 40
 41        return futures;
 42    }
 43
 44    // Callable implementation for event processing
 45    private static class EventProcessingCallable<E> implements Callable<EventProcessingResult> {
 46        private final E event;
 47        private final EventHandler<E> handler;
 48
 49        public EventProcessingCallable(E event, EventHandler<E> handler) {
 50            this.event = event;
 51            this.handler = handler;
 52        }
 53
 54        @Override
 55        public EventProcessingResult call() throws Exception {
 56            long startTime = System.currentTimeMillis();
 57
 58            try {
 59                EventProcessingResult result = handler.handleEvent(event);
 60                long duration = System.currentTimeMillis() - startTime;
 61
 62                return new EventProcessingResult(
 63                    handler.getHandlerId(),
 64                    event.getClass().getSimpleName(),
 65                    true,
 66                    duration,
 67                    result.getMessage()
 68                );
 69
 70            } catch (Exception e) {
 71                long duration = System.currentTimeMillis() - startTime;
 72
 73                return new EventProcessingResult(
 74                    handler.getHandlerId(),
 75                    event.getClass().getSimpleName(),
 76                    false,
 77                    duration,
 78                    "Error: " + e.getMessage()
 79                );
 80            }
 81        }
 82    }
 83
 84    // Event processing result
 85    public static class EventProcessingResult {
 86        private final String handlerId;
 87        private final String eventType;
 88        private final boolean success;
 89        private final long processingTimeMs;
 90        private final String message;
 91
 92        public EventProcessingResult(String handlerId, String eventType, boolean success,
 93                                   long processingTimeMs, String message) {
 94            this.handlerId = handlerId;
 95            this.eventType = eventType;
 96            this.success = success;
 97            this.processingTimeMs = processingTimeMs;
 98            this.message = message;
 99        }
100
101        @Override
102        public String toString() {
103            return String.format("EventProcessingResult{handler='%s', event='%s', success=%s, time=%dms, message='%s'}",
104                handlerId, eventType, success, processingTimeMs, message);
105        }
106
107        // Getters
108        public String getHandlerId() { return handlerId; }
109        public String getEventType() { return eventType; }
110        public boolean isSuccess() { return success; }
111        public long getProcessingTimeMs() { return processingTimeMs; }
112        public String getMessage() { return message; }
113    }
114
115    // Example events and handlers
116    public static class UserRegisteredEvent {
117        private final String userId;
118        private final String email;
119        private final long timestamp;
120
121        public UserRegisteredEvent(String userId, String email) {
122            this.userId = userId;
123            this.email = email;
124            this.timestamp = System.currentTimeMillis();
125        }
126
127        public String getUserId() { return userId; }
128        public String getEmail() { return email; }
129        public long getTimestamp() { return timestamp; }
130    }
131
132    public static class EmailNotificationHandler implements EventHandler<UserRegisteredEvent> {
133        private final String handlerId;
134
135        public EmailNotificationHandler(String handlerId) {
136            this.handlerId = handlerId;
137        }
138
139        @Override
140        public EventProcessingResult handleEvent(UserRegisteredEvent event) throws Exception {
141            // Simulate email sending
142            Thread.sleep(200);
143
144            if (Math.random() < 0.9) { // 90% success rate
145                return new EventProcessingResult(
146                    handlerId,
147                    "UserRegisteredEvent",
148                    true,
149                    200,
150                    "Welcome email sent to " + event.getEmail()
151                );
152            } else {
153                throw new Exception("Email service unavailable");
154            }
155        }
156
157        @Override
158        public String getHandlerId() {
159            return handlerId;
160        }
161    }
162
163    public static class AuditLogHandler implements EventHandler<UserRegisteredEvent> {
164        private final String handlerId;
165
166        public AuditLogHandler(String handlerId) {
167            this.handlerId = handlerId;
168        }
169
170        @Override
171        public EventProcessingResult handleEvent(UserRegisteredEvent event) throws Exception {
172            // Simulate audit logging
173            Thread.sleep(50);
174
175            return new EventProcessingResult(
176                handlerId,
177                "UserRegisteredEvent",
178                true,
179                50,
180                "Audit log created for user " + event.getUserId()
181            );
182        }
183
184        @Override
185        public String getHandlerId() {
186            return handlerId;
187        }
188    }
189
190    public void shutdown() {
191        eventProcessorExecutor.shutdown();
192        try {
193            if (!eventProcessorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
194                eventProcessorExecutor.shutdownNow();
195            }
196        } catch (InterruptedException e) {
197            eventProcessorExecutor.shutdownNow();
198        }
199    }
200}

โœ… Pros and Cons

โœ… Advantages:

  • Loose Coupling: Subjects don’t need to know about specific observers
  • Dynamic Relationships: Can add/remove observers at runtime
  • Concurrent Processing: Multiple observers can process events simultaneously
  • Scalability: Easy to add new event types and handlers

โŒ Disadvantages:

  • Performance Overhead: Thread creation and context switching
  • Memory Leaks: Observers may not be properly cleaned up
  • Ordering Issues: No guarantee of notification order
  • Error Handling: Failed observers can impact system performance

๐ŸŽฏ Use Cases:

  • UI event handling systems
  • Microservice event broadcasting
  • Real-time monitoring systems
  • Publish-subscribe message patterns

โšก Command Pattern

๐Ÿ“‹ Pattern Overview

The Command pattern encapsulates requests as objects, allowing for queuing, logging, and undo operations. In concurrent environments, it’s excellent for task scheduling and execution.

graph TD
    A[Command Interface] --> B[ConcreteCommand1]
    A --> C[ConcreteCommand2]
    A --> D[ConcreteCommand3]

    E[Invoker/Executor] --> F[CommandQueue]
    F --> G[ExecutorService]

    G --> H[Worker Thread 1]
    G --> I[Worker Thread 2]
    G --> J[Worker Thread 3]

    H --> K[execute()]
    I --> K
    J --> K

    style A fill:#ff6b6b
    style G fill:#4ecdc4
    style K fill:#feca57

๐Ÿ› ๏ธ Implementation with Thread Interfaces

1. Concurrent Command Executor:

  1public class ConcurrentCommandPattern {
  2
  3    private final ExecutorService commandExecutor;
  4    private final BlockingQueue<CommandExecution> executionQueue;
  5    private final Map<String, CommandResult> results = new ConcurrentHashMap<>();
  6    private final AtomicBoolean shutdown = new AtomicBoolean(false);
  7
  8    public ConcurrentCommandPattern(int threadPoolSize, int queueCapacity) {
  9        this.commandExecutor = Executors.newFixedThreadPool(threadPoolSize);
 10        this.executionQueue = new ArrayBlockingQueue<>(queueCapacity);
 11
 12        // Start result collector thread
 13        Thread resultCollector = new Thread(new ResultCollector());
 14        resultCollector.setDaemon(true);
 15        resultCollector.start();
 16    }
 17
 18    // Command interface
 19    public interface Command {
 20        CommandResult execute() throws Exception;
 21        String getCommandId();
 22        String getDescription();
 23        boolean isUndoable();
 24        void undo() throws Exception;
 25    }
 26
 27    // Command result wrapper
 28    public static class CommandResult {
 29        private final String commandId;
 30        private final boolean success;
 31        private final Object result;
 32        private final Exception exception;
 33        private final long executionTimeMs;
 34
 35        public CommandResult(String commandId, boolean success, Object result,
 36                           Exception exception, long executionTimeMs) {
 37            this.commandId = commandId;
 38            this.success = success;
 39            this.result = result;
 40            this.exception = exception;
 41            this.executionTimeMs = executionTimeMs;
 42        }
 43
 44        // Factory methods
 45        public static CommandResult success(String commandId, Object result, long executionTimeMs) {
 46            return new CommandResult(commandId, true, result, null, executionTimeMs);
 47        }
 48
 49        public static CommandResult failure(String commandId, Exception exception, long executionTimeMs) {
 50            return new CommandResult(commandId, false, null, exception, executionTimeMs);
 51        }
 52
 53        // Getters
 54        public String getCommandId() { return commandId; }
 55        public boolean isSuccess() { return success; }
 56        public Object getResult() { return result; }
 57        public Exception getException() { return exception; }
 58        public long getExecutionTimeMs() { return executionTimeMs; }
 59
 60        @Override
 61        public String toString() {
 62            return String.format("CommandResult{id='%s', success=%s, time=%dms, result=%s}",
 63                commandId, success, executionTimeMs,
 64                success ? result : (exception != null ? exception.getMessage() : "null"));
 65        }
 66    }
 67
 68    // Command execution wrapper
 69    private static class CommandExecution {
 70        private final Command command;
 71        private final CompletableFuture<CommandResult> future;
 72
 73        public CommandExecution(Command command, CompletableFuture<CommandResult> future) {
 74            this.command = command;
 75            this.future = future;
 76        }
 77
 78        public Command getCommand() { return command; }
 79        public CompletableFuture<CommandResult> getFuture() { return future; }
 80    }
 81
 82    // Execute command asynchronously
 83    public CompletableFuture<CommandResult> executeAsync(Command command) {
 84        CompletableFuture<CommandResult> future = new CompletableFuture<>();
 85
 86        commandExecutor.submit(new CommandExecutionTask(command, future));
 87
 88        return future;
 89    }
 90
 91    // Execute command synchronously with timeout
 92    public CommandResult executeSync(Command command, long timeoutMs)
 93            throws InterruptedException, ExecutionException, TimeoutException {
 94        CompletableFuture<CommandResult> future = executeAsync(command);
 95        return future.get(timeoutMs, TimeUnit.MILLISECONDS);
 96    }
 97
 98    // Command execution Runnable
 99    private class CommandExecutionTask implements Runnable {
100        private final Command command;
101        private final CompletableFuture<CommandResult> future;
102
103        public CommandExecutionTask(Command command, CompletableFuture<CommandResult> future) {
104            this.command = command;
105            this.future = future;
106        }
107
108        @Override
109        public void run() {
110            long startTime = System.currentTimeMillis();
111
112            try {
113                System.out.println("Executing command: " + command.getDescription());
114                CommandResult result = command.execute();
115
116                long duration = System.currentTimeMillis() - startTime;
117                CommandResult timedResult = new CommandResult(
118                    result.getCommandId(),
119                    result.isSuccess(),
120                    result.getResult(),
121                    result.getException(),
122                    duration
123                );
124
125                future.complete(timedResult);
126
127                // Add to execution queue for result collection
128                try {
129                    executionQueue.offer(new CommandExecution(command, CompletableFuture.completedFuture(timedResult)),
130                                       1, TimeUnit.SECONDS);
131                } catch (InterruptedException e) {
132                    Thread.currentThread().interrupt();
133                }
134
135            } catch (Exception e) {
136                long duration = System.currentTimeMillis() - startTime;
137                CommandResult errorResult = CommandResult.failure(command.getCommandId(), e, duration);
138                future.complete(errorResult);
139            }
140        }
141    }
142
143    // Result collector Runnable
144    private class ResultCollector implements Runnable {
145        @Override
146        public void run() {
147            while (!shutdown.get()) {
148                try {
149                    CommandExecution execution = executionQueue.poll(1, TimeUnit.SECONDS);
150                    if (execution != null) {
151                        CommandResult result = execution.getFuture().get();
152                        results.put(result.getCommandId(), result);
153
154                        System.out.println("Collected result: " + result);
155                    }
156                } catch (InterruptedException e) {
157                    Thread.currentThread().interrupt();
158                    break;
159                } catch (ExecutionException e) {
160                    System.err.println("Error collecting result: " + e.getCause());
161                }
162            }
163        }
164    }
165
166    // Get command result
167    public CommandResult getResult(String commandId) {
168        return results.get(commandId);
169    }
170
171    // Get all results
172    public Map<String, CommandResult> getAllResults() {
173        return new HashMap<>(results);
174    }
175
176    // Concrete command implementations
177    public static class FileProcessingCommand implements Command {
178        private final String commandId;
179        private final String filePath;
180        private final String operation;
181
182        public FileProcessingCommand(String commandId, String filePath, String operation) {
183            this.commandId = commandId;
184            this.filePath = filePath;
185            this.operation = operation;
186        }
187
188        @Override
189        public CommandResult execute() throws Exception {
190            // Simulate file processing
191            Thread.sleep((long)(Math.random() * 1000) + 500);
192
193            if (Math.random() < 0.1) { // 10% failure rate
194                throw new Exception("File processing failed for: " + filePath);
195            }
196
197            String result = String.format("File %s processed with operation: %s", filePath, operation);
198            return CommandResult.success(commandId, result, 0);
199        }
200
201        @Override
202        public String getCommandId() {
203            return commandId;
204        }
205
206        @Override
207        public String getDescription() {
208            return String.format("FileProcessing[%s]: %s on %s", commandId, operation, filePath);
209        }
210
211        @Override
212        public boolean isUndoable() {
213            return "DELETE".equalsIgnoreCase(operation);
214        }
215
216        @Override
217        public void undo() throws Exception {
218            if (isUndoable()) {
219                System.out.println("Undoing delete operation for: " + filePath);
220                // Simulate restore operation
221                Thread.sleep(200);
222            } else {
223                throw new UnsupportedOperationException("Cannot undo operation: " + operation);
224            }
225        }
226    }
227
228    public static class DatabaseCommand implements Command {
229        private final String commandId;
230        private final String sql;
231        private final String operation;
232
233        public DatabaseCommand(String commandId, String sql, String operation) {
234            this.commandId = commandId;
235            this.sql = sql;
236            this.operation = operation;
237        }
238
239        @Override
240        public CommandResult execute() throws Exception {
241            // Simulate database operation
242            Thread.sleep((long)(Math.random() * 500) + 100);
243
244            if (sql.contains("DROP") && Math.random() < 0.3) {
245                throw new Exception("Database operation failed: " + sql);
246            }
247
248            String result = String.format("Database %s executed: %s", operation, sql);
249            return CommandResult.success(commandId, result, 0);
250        }
251
252        @Override
253        public String getCommandId() {
254            return commandId;
255        }
256
257        @Override
258        public String getDescription() {
259            return String.format("Database[%s]: %s", commandId, sql);
260        }
261
262        @Override
263        public boolean isUndoable() {
264            return operation.equalsIgnoreCase("INSERT") || operation.equalsIgnoreCase("UPDATE");
265        }
266
267        @Override
268        public void undo() throws Exception {
269            if (isUndoable()) {
270                System.out.println("Undoing database operation: " + sql);
271                Thread.sleep(150);
272            } else {
273                throw new UnsupportedOperationException("Cannot undo database operation: " + operation);
274            }
275        }
276    }
277
278    public void shutdown() {
279        shutdown.set(true);
280        commandExecutor.shutdown();
281
282        try {
283            if (!commandExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
284                commandExecutor.shutdownNow();
285            }
286        } catch (InterruptedException e) {
287            commandExecutor.shutdownNow();
288        }
289    }
290}

โœ… Pros and Cons

โœ… Advantages:

  • Decoupling: Invoker doesn’t need to know command implementation details
  • Queuing: Commands can be queued, scheduled, and batch processed
  • Undo Support: Easy to implement undo/redo functionality
  • Logging: Command execution can be logged and audited
  • Macro Commands: Can combine multiple commands into composite operations

โŒ Disadvantages:

  • Complexity: Adds layers of abstraction
  • Memory Usage: Each command is an object consuming memory
  • Performance: Object creation overhead for simple operations
  • Thread Safety: Commands must be thread-safe if shared

๐ŸŽฏ Use Cases:

  • Task scheduling systems
  • Undo/redo functionality
  • Batch processing frameworks
  • Request queuing in web applications
  • Database transaction management

๐ŸŽ›๏ธ Strategy Pattern

๐Ÿ“‹ Pattern Overview

The Strategy pattern defines a family of algorithms, encapsulates each one, and makes them interchangeable. In concurrent programming, different strategies can be executed in parallel.

๐Ÿ› ๏ธ Implementation with Thread Interfaces

1. Concurrent Strategy Execution:

  1public class ConcurrentStrategyPattern {
  2
  3    private final ExecutorService strategyExecutor;
  4    private final CompletionService<StrategyResult> completionService;
  5
  6    public ConcurrentStrategyPattern(int threadPoolSize) {
  7        this.strategyExecutor = Executors.newFixedThreadPool(threadPoolSize);
  8        this.completionService = new ExecutorCompletionService<>(strategyExecutor);
  9    }
 10
 11    // Strategy interface
 12    public interface Strategy<T, R> {
 13        R execute(T input) throws Exception;
 14        String getStrategyName();
 15        int getPriority(); // Higher number = higher priority
 16        long getEstimatedExecutionTime();
 17    }
 18
 19    // Strategy execution context
 20    public static class StrategyContext<T> {
 21        private final T input;
 22        private final List<Strategy<T, ?>> strategies;
 23        private final ExecutionMode mode;
 24
 25        public enum ExecutionMode {
 26            PARALLEL,      // Execute all strategies in parallel
 27            RACE,          // Return first successful result
 28            FALLBACK,      // Try strategies in priority order until one succeeds
 29            BEST_EFFORT    // Execute all and return best result based on criteria
 30        }
 31
 32        public StrategyContext(T input, List<Strategy<T, ?>> strategies, ExecutionMode mode) {
 33            this.input = input;
 34            this.strategies = new ArrayList<>(strategies);
 35            this.mode = mode;
 36        }
 37
 38        public T getInput() { return input; }
 39        public List<Strategy<T, ?>> getStrategies() { return strategies; }
 40        public ExecutionMode getMode() { return mode; }
 41    }
 42
 43    // Strategy execution result
 44    public static class StrategyResult {
 45        private final String strategyName;
 46        private final Object result;
 47        private final boolean success;
 48        private final Exception exception;
 49        private final long executionTimeMs;
 50
 51        public StrategyResult(String strategyName, Object result, boolean success,
 52                            Exception exception, long executionTimeMs) {
 53            this.strategyName = strategyName;
 54            this.result = result;
 55            this.success = success;
 56            this.exception = exception;
 57            this.executionTimeMs = executionTimeMs;
 58        }
 59
 60        public static StrategyResult success(String strategyName, Object result, long executionTimeMs) {
 61            return new StrategyResult(strategyName, result, true, null, executionTimeMs);
 62        }
 63
 64        public static StrategyResult failure(String strategyName, Exception exception, long executionTimeMs) {
 65            return new StrategyResult(strategyName, null, false, exception, executionTimeMs);
 66        }
 67
 68        // Getters
 69        public String getStrategyName() { return strategyName; }
 70        public Object getResult() { return result; }
 71        public boolean isSuccess() { return success; }
 72        public Exception getException() { return exception; }
 73        public long getExecutionTimeMs() { return executionTimeMs; }
 74
 75        @Override
 76        public String toString() {
 77            return String.format("StrategyResult{strategy='%s', success=%s, time=%dms, result=%s}",
 78                strategyName, success, executionTimeMs,
 79                success ? result : (exception != null ? exception.getMessage() : "null"));
 80        }
 81    }
 82
 83    // Execute strategies based on context
 84    public <T> CompletableFuture<List<StrategyResult>> executeStrategies(StrategyContext<T> context) {
 85        switch (context.getMode()) {
 86            case PARALLEL:
 87                return executeParallel(context);
 88            case RACE:
 89                return executeRace(context);
 90            case FALLBACK:
 91                return executeFallback(context);
 92            case BEST_EFFORT:
 93                return executeBestEffort(context);
 94            default:
 95                throw new IllegalArgumentException("Unsupported execution mode: " + context.getMode());
 96        }
 97    }
 98
 99    // Execute all strategies in parallel
100    @SuppressWarnings("unchecked")
101    private <T> CompletableFuture<List<StrategyResult>> executeParallel(StrategyContext<T> context) {
102        List<CompletableFuture<StrategyResult>> futures = new ArrayList<>();
103
104        for (Strategy<T, ?> strategy : context.getStrategies()) {
105            CompletableFuture<StrategyResult> future = CompletableFuture.supplyAsync(() -> {
106                return executeStrategy((Strategy<T, Object>) strategy, context.getInput());
107            }, strategyExecutor);
108
109            futures.add(future);
110        }
111
112        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
113            .thenApply(v -> futures.stream()
114                .map(CompletableFuture::join)
115                .collect(Collectors.toList()));
116    }
117
118    // Execute strategies in race mode (first successful result wins)
119    @SuppressWarnings("unchecked")
120    private <T> CompletableFuture<List<StrategyResult>> executeRace(StrategyContext<T> context) {
121        CompletableFuture<StrategyResult> raceFuture = new CompletableFuture<>();
122        AtomicInteger completedCount = new AtomicInteger(0);
123        List<StrategyResult> allResults = new CopyOnWriteArrayList<>();
124
125        for (Strategy<T, ?> strategy : context.getStrategies()) {
126            CompletableFuture.supplyAsync(() -> {
127                return executeStrategy((Strategy<T, Object>) strategy, context.getInput());
128            }, strategyExecutor)
129            .whenComplete((result, throwable) -> {
130                allResults.add(result);
131
132                // Complete race future with first successful result
133                if (result.isSuccess() && !raceFuture.isDone()) {
134                    raceFuture.complete(result);
135                }
136
137                // If all strategies completed and none succeeded, complete with last result
138                if (completedCount.incrementAndGet() == context.getStrategies().size() && !raceFuture.isDone()) {
139                    raceFuture.complete(result);
140                }
141            });
142        }
143
144        return raceFuture.thenApply(winningResult -> {
145            System.out.println("Race winner: " + winningResult.getStrategyName());
146            return allResults;
147        });
148    }
149
150    // Execute strategies in fallback mode (priority order until success)
151    @SuppressWarnings("unchecked")
152    private <T> CompletableFuture<List<StrategyResult>> executeFallback(StrategyContext<T> context) {
153        List<Strategy<T, ?>> sortedStrategies = context.getStrategies().stream()
154            .sorted((s1, s2) -> Integer.compare(s2.getPriority(), s1.getPriority()))
155            .collect(Collectors.toList());
156
157        return CompletableFuture.supplyAsync(() -> {
158            List<StrategyResult> results = new ArrayList<>();
159
160            for (Strategy<T, ?> strategy : sortedStrategies) {
161                StrategyResult result = executeStrategy((Strategy<T, Object>) strategy, context.getInput());
162                results.add(result);
163
164                if (result.isSuccess()) {
165                    System.out.println("Fallback success with strategy: " + strategy.getStrategyName());
166                    break;
167                }
168            }
169
170            return results;
171        }, strategyExecutor);
172    }
173
174    // Execute strategies and return best result based on execution time and success
175    @SuppressWarnings("unchecked")
176    private <T> CompletableFuture<List<StrategyResult>> executeBestEffort(StrategyContext<T> context) {
177        return executeParallel(context).thenApply(results -> {
178            StrategyResult bestResult = results.stream()
179                .filter(StrategyResult::isSuccess)
180                .min(Comparator.comparing(StrategyResult::getExecutionTimeMs))
181                .orElse(results.get(0));
182
183            System.out.println("Best effort result: " + bestResult.getStrategyName() +
184                " (time: " + bestResult.getExecutionTimeMs() + "ms)");
185
186            return results;
187        });
188    }
189
190    // Execute single strategy
191    private <T> StrategyResult executeStrategy(Strategy<T, Object> strategy, T input) {
192        long startTime = System.currentTimeMillis();
193
194        try {
195            Object result = strategy.execute(input);
196            long duration = System.currentTimeMillis() - startTime;
197
198            return StrategyResult.success(strategy.getStrategyName(), result, duration);
199
200        } catch (Exception e) {
201            long duration = System.currentTimeMillis() - startTime;
202            return StrategyResult.failure(strategy.getStrategyName(), e, duration);
203        }
204    }
205
206    // Example strategy implementations for data processing
207    public static class FastProcessingStrategy implements Strategy<String, String> {
208        @Override
209        public String execute(String input) throws Exception {
210            Thread.sleep(100); // Fast processing
211            return "Fast result: " + input.toUpperCase();
212        }
213
214        @Override
215        public String getStrategyName() {
216            return "FastProcessing";
217        }
218
219        @Override
220        public int getPriority() {
221            return 3;
222        }
223
224        @Override
225        public long getEstimatedExecutionTime() {
226            return 100;
227        }
228    }
229
230    public static class AccurateProcessingStrategy implements Strategy<String, String> {
231        @Override
232        public String execute(String input) throws Exception {
233            Thread.sleep(500); // Slower but more accurate
234
235            if (Math.random() < 0.1) {
236                throw new Exception("Processing failed");
237            }
238
239            return "Accurate result: " + input.toLowerCase().replace(" ", "_");
240        }
241
242        @Override
243        public String getStrategyName() {
244            return "AccurateProcessing";
245        }
246
247        @Override
248        public int getPriority() {
249            return 2;
250        }
251
252        @Override
253        public long getEstimatedExecutionTime() {
254            return 500;
255        }
256    }
257
258    public static class FallbackProcessingStrategy implements Strategy<String, String> {
259        @Override
260        public String execute(String input) throws Exception {
261            Thread.sleep(50); // Very fast fallback
262            return "Fallback result: " + input;
263        }
264
265        @Override
266        public String getStrategyName() {
267            return "FallbackProcessing";
268        }
269
270        @Override
271        public int getPriority() {
272            return 1; // Lowest priority
273        }
274
275        @Override
276        public long getEstimatedExecutionTime() {
277            return 50;
278        }
279    }
280
281    public void shutdown() {
282        strategyExecutor.shutdown();
283        try {
284            if (!strategyExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
285                strategyExecutor.shutdownNow();
286            }
287        } catch (InterruptedException e) {
288            strategyExecutor.shutdownNow();
289        }
290    }
291}

โœ… Pros and Cons

โœ… Advantages:

  • Flexibility: Easy to add new strategies without modifying existing code
  • Performance: Can execute strategies in parallel for faster results
  • Resilience: Fallback strategies provide fault tolerance
  • Optimization: Can choose best strategy based on runtime conditions

โŒ Disadvantages:

  • Complexity: Multiple execution modes add complexity
  • Resource Usage: Parallel execution consumes more resources
  • Coordination Overhead: Managing multiple strategy executions
  • Decision Logic: Choosing the right execution mode can be complex

๐ŸŽฏ Use Cases:

  • Algorithm selection based on data characteristics
  • Load balancing across different service endpoints
  • Fault-tolerant processing with fallback options
  • Performance optimization through parallel execution

๐Ÿข Enterprise Patterns

๐Ÿ“‹ Thread Pool Manager Pattern

  1public class EnterpriseThreadPoolManager {
  2
  3    private final Map<String, ExecutorService> threadPools = new ConcurrentHashMap<>();
  4    private final Map<String, ThreadPoolMetrics> metrics = new ConcurrentHashMap<>();
  5    private final ScheduledExecutorService monitoringExecutor;
  6
  7    public EnterpriseThreadPoolManager() {
  8        this.monitoringExecutor = Executors.newScheduledThreadPool(1);
  9        startMonitoring();
 10    }
 11
 12    // Thread pool configuration
 13    public static class ThreadPoolConfig {
 14        private final String poolName;
 15        private final int corePoolSize;
 16        private final int maximumPoolSize;
 17        private final long keepAliveTime;
 18        private final TimeUnit timeUnit;
 19        private final int queueCapacity;
 20        private final boolean allowCoreThreadTimeOut;
 21
 22        public ThreadPoolConfig(String poolName, int corePoolSize, int maximumPoolSize,
 23                              long keepAliveTime, TimeUnit timeUnit, int queueCapacity,
 24                              boolean allowCoreThreadTimeOut) {
 25            this.poolName = poolName;
 26            this.corePoolSize = corePoolSize;
 27            this.maximumPoolSize = maximumPoolSize;
 28            this.keepAliveTime = keepAliveTime;
 29            this.timeUnit = timeUnit;
 30            this.queueCapacity = queueCapacity;
 31            this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
 32        }
 33
 34        // Getters
 35        public String getPoolName() { return poolName; }
 36        public int getCorePoolSize() { return corePoolSize; }
 37        public int getMaximumPoolSize() { return maximumPoolSize; }
 38        public long getKeepAliveTime() { return keepAliveTime; }
 39        public TimeUnit getTimeUnit() { return timeUnit; }
 40        public int getQueueCapacity() { return queueCapacity; }
 41        public boolean isAllowCoreThreadTimeOut() { return allowCoreThreadTimeOut; }
 42    }
 43
 44    // Create thread pool with configuration
 45    public void createThreadPool(ThreadPoolConfig config) {
 46        ThreadPoolExecutor executor = new ThreadPoolExecutor(
 47            config.getCorePoolSize(),
 48            config.getMaximumPoolSize(),
 49            config.getKeepAliveTime(),
 50            config.getTimeUnit(),
 51            new ArrayBlockingQueue<>(config.getQueueCapacity()),
 52            new NamedThreadFactory(config.getPoolName()),
 53            new ThreadPoolExecutor.CallerRunsPolicy()
 54        );
 55
 56        executor.allowCoreThreadTimeOut(config.isAllowCoreThreadTimeOut());
 57
 58        threadPools.put(config.getPoolName(), executor);
 59        metrics.put(config.getPoolName(), new ThreadPoolMetrics(config.getPoolName()));
 60
 61        System.out.println("Created thread pool: " + config.getPoolName());
 62    }
 63
 64    // Get thread pool
 65    public ExecutorService getThreadPool(String poolName) {
 66        return threadPools.get(poolName);
 67    }
 68
 69    // Submit task to specific pool
 70    public <T> Future<T> submitTask(String poolName, Callable<T> task) {
 71        ExecutorService executor = threadPools.get(poolName);
 72        if (executor == null) {
 73            throw new IllegalArgumentException("Thread pool not found: " + poolName);
 74        }
 75
 76        ThreadPoolMetrics poolMetrics = metrics.get(poolName);
 77        poolMetrics.incrementSubmitted();
 78
 79        return executor.submit(() -> {
 80            long startTime = System.currentTimeMillis();
 81            try {
 82                T result = task.call();
 83                long duration = System.currentTimeMillis() - startTime;
 84                poolMetrics.recordExecution(duration, true);
 85                return result;
 86            } catch (Exception e) {
 87                long duration = System.currentTimeMillis() - startTime;
 88                poolMetrics.recordExecution(duration, false);
 89                throw new RuntimeException(e);
 90            }
 91        });
 92    }
 93
 94    // Thread pool metrics
 95    private static class ThreadPoolMetrics {
 96        private final String poolName;
 97        private final AtomicLong submittedTasks = new AtomicLong(0);
 98        private final AtomicLong completedTasks = new AtomicLong(0);
 99        private final AtomicLong failedTasks = new AtomicLong(0);
100        private final AtomicLong totalExecutionTime = new AtomicLong(0);
101
102        public ThreadPoolMetrics(String poolName) {
103            this.poolName = poolName;
104        }
105
106        public void incrementSubmitted() {
107            submittedTasks.incrementAndGet();
108        }
109
110        public void recordExecution(long durationMs, boolean success) {
111            completedTasks.incrementAndGet();
112            totalExecutionTime.addAndGet(durationMs);
113
114            if (!success) {
115                failedTasks.incrementAndGet();
116            }
117        }
118
119        public String getPoolName() { return poolName; }
120        public long getSubmittedTasks() { return submittedTasks.get(); }
121        public long getCompletedTasks() { return completedTasks.get(); }
122        public long getFailedTasks() { return failedTasks.get(); }
123        public double getAverageExecutionTime() {
124            long completed = completedTasks.get();
125            return completed > 0 ? (double) totalExecutionTime.get() / completed : 0.0;
126        }
127
128        @Override
129        public String toString() {
130            return String.format("ThreadPoolMetrics{pool='%s', submitted=%d, completed=%d, failed=%d, avgTime=%.2fms}",
131                poolName, submittedTasks.get(), completedTasks.get(), failedTasks.get(), getAverageExecutionTime());
132        }
133    }
134
135    // Named thread factory
136    private static class NamedThreadFactory implements ThreadFactory {
137        private final String namePrefix;
138        private final AtomicInteger threadNumber = new AtomicInteger(1);
139
140        public NamedThreadFactory(String namePrefix) {
141            this.namePrefix = namePrefix;
142        }
143
144        @Override
145        public Thread newThread(Runnable r) {
146            Thread thread = new Thread(r, namePrefix + "-thread-" + threadNumber.getAndIncrement());
147            thread.setDaemon(false);
148            return thread;
149        }
150    }
151
152    // Monitoring
153    private void startMonitoring() {
154        monitoringExecutor.scheduleAtFixedRate(() -> {
155            for (Map.Entry<String, ExecutorService> entry : threadPools.entrySet()) {
156                String poolName = entry.getKey();
157                ExecutorService executor = entry.getValue();
158
159                if (executor instanceof ThreadPoolExecutor) {
160                    ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
161                    ThreadPoolMetrics poolMetrics = metrics.get(poolName);
162
163                    System.out.printf("Pool: %s | Active: %d | Queue: %d | Completed: %d | %s%n",
164                        poolName,
165                        tpe.getActiveCount(),
166                        tpe.getQueue().size(),
167                        tpe.getCompletedTaskCount(),
168                        poolMetrics.toString()
169                    );
170                }
171            }
172            System.out.println("---");
173        }, 5, 5, TimeUnit.SECONDS);
174    }
175
176    public void shutdown() {
177        for (ExecutorService executor : threadPools.values()) {
178            executor.shutdown();
179        }
180        monitoringExecutor.shutdown();
181    }
182}

๐Ÿ“Š Pattern Comparison Summary

๐ŸŽฏ When to Use Each Pattern

graph TD
    A[Concurrency Pattern Selection] --> B{Data Flow Type?}

    B -->|Producer-Consumer| C[Producer-Consumer Pattern]
    B -->|Event Driven| D[Observer Pattern]
    B -->|Task Based| E[Command Pattern]
    B -->|Algorithm Choice| F[Strategy Pattern]

    C --> G[High throughput data processing]
    C --> H[Message queue systems]

    D --> I[Event broadcasting]
    D --> J[State change notifications]

    E --> K[Task scheduling]
    E --> L[Undo/redo operations]

    F --> M[Algorithm selection]
    F --> N[Fault tolerance]

    style C fill:#ff6b6b
    style D fill:#4ecdc4
    style E fill:#feca57
    style F fill:#45b7d1
PatternBest ForThread InterfaceComplexityPerformance
Producer-ConsumerData streaming, Queue processingRunnable, BlockingQueueMediumHigh throughput
ObserverEvent systems, State changesRunnable, CompletableFutureMediumMedium latency
CommandTask scheduling, Undo operationsCallable, ExecutorServiceHighVariable
StrategyAlgorithm selection, Fault toleranceCallable, CompletableFutureHighOptimized
Thread Pool ManagerEnterprise resource managementAll interfacesHighHighly optimized

๐ŸŽฏ Conclusion

Design patterns combined with Java thread interfaces provide powerful abstractions for building robust concurrent systems. Each pattern serves specific use cases and offers unique advantages in concurrent environments.

Key principles for success:

  1. Choose the Right Pattern: Match pattern to your specific concurrency needs
  2. Leverage Thread Interfaces: Use Runnable for fire-and-forget, Callable for results
  3. Plan for Failures: Implement proper error handling and fallback strategies
  4. Monitor Performance: Track metrics and optimize based on actual usage
  5. Test Thoroughly: Concurrent patterns require extensive testing under load

Master these patterns and you’ll be equipped to handle complex concurrent programming challenges in enterprise Java applications.