Java Concurrency Deep Dive Part 2: Mastering Runnable, Callable Patterns and Internal Mechanisms

๐ŸŽฏ Introduction

Building upon our comprehensive overview of Java concurrency, this deep dive explores the fundamental building blocks that power Java’s threading mechanisms. We’ll dissect the internals of Runnable and Callable interfaces, examine thread synchronization primitives, understand the Java Memory Model, and explore advanced patterns that form the foundation of robust concurrent applications.

This technical deep dive is essential for developers who want to understand not just how to use Java’s concurrency tools, but how they work under the hood and how to leverage them effectively in complex scenarios.

๐Ÿ”ฌ Runnable Interface: The Foundation of Java Threading

๐Ÿ“‹ Understanding Runnable Internals

The Runnable interface is the most fundamental abstraction for representing a task that can be executed by a thread. Let’s examine its internal structure and behavior patterns.

 1@FunctionalInterface
 2public interface Runnable {
 3    /**
 4     * When an object implementing interface <code>Runnable</code> is used
 5     * to create a thread, starting the thread causes the object's
 6     * <code>run</code> method to be called in that separately executing
 7     * thread.
 8     */
 9    public abstract void run();
10}

๐Ÿ” Deep Dive: Thread Execution Lifecycle

graph TD
    A[Thread.start() called] --> B[Thread enters RUNNABLE state]
    B --> C[Thread Scheduler assigns CPU]
    C --> D[run() method execution begins]
    D --> E{Exception thrown?}
    E -->|No| F[run() completes normally]
    E -->|Yes| G[UncaughtExceptionHandler called]
    F --> H[Thread enters TERMINATED state]
    G --> H

    I[Thread Interruption] --> J{Thread.isInterrupted()}
    J -->|true| K[InterruptedException handling]
    J -->|false| D

    style D fill:#4ecdc4
    style H fill:#ff6b35
    style K fill:#feca57

๐Ÿ› ๏ธ Advanced Runnable Patterns

1. Interruptible Runnable Pattern:

 1public class InterruptibleTask implements Runnable {
 2
 3    private final String taskName;
 4    private final AtomicBoolean cancelled = new AtomicBoolean(false);
 5    private volatile boolean running = false;
 6
 7    public InterruptibleTask(String taskName) {
 8        this.taskName = taskName;
 9    }
10
11    @Override
12    public void run() {
13        Thread currentThread = Thread.currentThread();
14        running = true;
15
16        try {
17            while (!cancelled.get() && !currentThread.isInterrupted()) {
18                // Check interruption status periodically
19                if (currentThread.isInterrupted()) {
20                    System.out.println(taskName + " was interrupted");
21                    break;
22                }
23
24                // Simulate work that can be interrupted
25                performInterruptibleWork();
26
27                // Cooperative interruption check
28                if (Thread.interrupted()) {
29                    System.out.println(taskName + " detected interruption flag");
30                    break;
31                }
32            }
33        } catch (InterruptedException e) {
34            System.out.println(taskName + " interrupted via exception");
35            // Restore interrupt status
36            currentThread.interrupt();
37        } finally {
38            running = false;
39            cleanup();
40        }
41    }
42
43    private void performInterruptibleWork() throws InterruptedException {
44        for (int i = 0; i < 100; i++) {
45            // Simulate CPU-intensive work
46            Math.sqrt(i * Math.PI);
47
48            // Periodic interruption check for CPU-intensive tasks
49            if (i % 10 == 0 && Thread.currentThread().isInterrupted()) {
50                throw new InterruptedException("Work interrupted at iteration " + i);
51            }
52        }
53
54        // Simulate I/O or blocking operation that respects interruption
55        Thread.sleep(50);
56    }
57
58    private void cleanup() {
59        System.out.println(taskName + " performing cleanup");
60        // Release resources, close connections, etc.
61    }
62
63    public void cancel() {
64        cancelled.set(true);
65    }
66
67    public boolean isRunning() {
68        return running;
69    }
70}

2. Parameterized Runnable Pattern:

 1public class ParameterizedRunnable<T> implements Runnable {
 2
 3    private final T parameter;
 4    private final Consumer<T> task;
 5    private final BiConsumer<T, Exception> errorHandler;
 6    private final Consumer<T> completionHandler;
 7
 8    public ParameterizedRunnable(T parameter,
 9                               Consumer<T> task,
10                               BiConsumer<T, Exception> errorHandler,
11                               Consumer<T> completionHandler) {
12        this.parameter = parameter;
13        this.task = task;
14        this.errorHandler = errorHandler;
15        this.completionHandler = completionHandler;
16    }
17
18    @Override
19    public void run() {
20        String threadName = Thread.currentThread().getName();
21        try {
22            System.out.println("Thread " + threadName + " processing: " + parameter);
23            task.accept(parameter);
24
25            if (completionHandler != null) {
26                completionHandler.accept(parameter);
27            }
28
29        } catch (Exception e) {
30            if (errorHandler != null) {
31                errorHandler.accept(parameter, e);
32            } else {
33                System.err.println("Error in thread " + threadName +
34                    " processing " + parameter + ": " + e.getMessage());
35            }
36        }
37    }
38
39    // Factory methods for common patterns
40    public static <T> ParameterizedRunnable<T> withLogging(T parameter, Consumer<T> task) {
41        return new ParameterizedRunnable<>(
42            parameter,
43            task,
44            (param, ex) -> System.err.println("Failed processing " + param + ": " + ex.getMessage()),
45            param -> System.out.println("Completed processing " + param)
46        );
47    }
48
49    public static <T> ParameterizedRunnable<T> withRetry(T parameter,
50                                                       Consumer<T> task,
51                                                       int maxRetries) {
52        return new ParameterizedRunnable<>(
53            parameter,
54            createRetryingTask(task, maxRetries),
55            (param, ex) -> System.err.println("Failed after " + maxRetries + " retries: " + ex.getMessage()),
56            null
57        );
58    }
59
60    private static <T> Consumer<T> createRetryingTask(Consumer<T> originalTask, int maxRetries) {
61        return parameter -> {
62            Exception lastException = null;
63            for (int attempt = 1; attempt <= maxRetries; attempt++) {
64                try {
65                    originalTask.accept(parameter);
66                    return; // Success
67                } catch (Exception e) {
68                    lastException = e;
69                    if (attempt < maxRetries) {
70                        try {
71                            Thread.sleep(100 * attempt); // Exponential backoff
72                        } catch (InterruptedException ie) {
73                            Thread.currentThread().interrupt();
74                            throw new RuntimeException("Interrupted during retry", ie);
75                        }
76                    }
77                }
78            }
79            throw new RuntimeException("Failed after " + maxRetries + " attempts", lastException);
80        };
81    }
82}

3. State-Aware Runnable Pattern:

 1public class StatefulRunnable implements Runnable {
 2
 3    public enum State {
 4        CREATED, RUNNING, PAUSED, CANCELLED, COMPLETED, FAILED
 5    }
 6
 7    private volatile State currentState = State.CREATED;
 8    private final Object stateLock = new Object();
 9    private final CountDownLatch completionLatch = new CountDownLatch(1);
10
11    private final String taskId;
12    private final Runnable actualTask;
13    private Exception failureException;
14
15    public StatefulRunnable(String taskId, Runnable actualTask) {
16        this.taskId = taskId;
17        this.actualTask = actualTask;
18    }
19
20    @Override
21    public void run() {
22        synchronized (stateLock) {
23            if (currentState != State.CREATED) {
24                throw new IllegalStateException("Task has already been executed");
25            }
26            currentState = State.RUNNING;
27        }
28
29        try {
30            actualTask.run();
31
32            synchronized (stateLock) {
33                if (currentState == State.RUNNING) {
34                    currentState = State.COMPLETED;
35                }
36            }
37
38        } catch (Exception e) {
39            synchronized (stateLock) {
40                currentState = State.FAILED;
41                failureException = e;
42            }
43            throw e;
44
45        } finally {
46            completionLatch.countDown();
47        }
48    }
49
50    public State getCurrentState() {
51        synchronized (stateLock) {
52            return currentState;
53        }
54    }
55
56    public void pause() {
57        synchronized (stateLock) {
58            if (currentState == State.RUNNING) {
59                currentState = State.PAUSED;
60            }
61        }
62        // Note: Actual pause implementation would require cooperation from the task
63    }
64
65    public void cancel() {
66        synchronized (stateLock) {
67            if (currentState == State.CREATED || currentState == State.RUNNING || currentState == State.PAUSED) {
68                currentState = State.CANCELLED;
69            }
70        }
71    }
72
73    public boolean waitForCompletion(long timeoutMs) throws InterruptedException {
74        return completionLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
75    }
76
77    public Exception getFailureException() {
78        synchronized (stateLock) {
79            return failureException;
80        }
81    }
82
83    public String getTaskId() {
84        return taskId;
85    }
86}

๐Ÿ“ž Callable Interface: Advanced Task Execution

๐Ÿ” Callable Internals and Patterns

The Callable interface extends the concept of Runnable by adding return values and checked exception handling.

1@FunctionalInterface
2public interface Callable<V> {
3    /**
4     * Computes a result, or throws an exception if unable to do so.
5     */
6    V call() throws Exception;
7}

๐Ÿ› ๏ธ Advanced Callable Patterns

1. Timeout-Aware Callable:

 1public class TimeoutAwareCallable<V> implements Callable<V> {
 2
 3    private final Callable<V> delegate;
 4    private final long timeoutMs;
 5    private final V timeoutValue;
 6    private final boolean throwOnTimeout;
 7
 8    public TimeoutAwareCallable(Callable<V> delegate, long timeoutMs, V timeoutValue) {
 9        this(delegate, timeoutMs, timeoutValue, false);
10    }
11
12    public TimeoutAwareCallable(Callable<V> delegate, long timeoutMs, boolean throwOnTimeout) {
13        this(delegate, timeoutMs, null, throwOnTimeout);
14    }
15
16    private TimeoutAwareCallable(Callable<V> delegate, long timeoutMs, V timeoutValue, boolean throwOnTimeout) {
17        this.delegate = delegate;
18        this.timeoutMs = timeoutMs;
19        this.timeoutValue = timeoutValue;
20        this.throwOnTimeout = throwOnTimeout;
21    }
22
23    @Override
24    public V call() throws Exception {
25        ExecutorService executor = Executors.newSingleThreadExecutor();
26        try {
27            Future<V> future = executor.submit(delegate);
28            return future.get(timeoutMs, TimeUnit.MILLISECONDS);
29
30        } catch (TimeoutException e) {
31            if (throwOnTimeout) {
32                throw new CallableTimeoutException("Callable timed out after " + timeoutMs + "ms", e);
33            }
34            return timeoutValue;
35
36        } finally {
37            executor.shutdownNow();
38        }
39    }
40
41    public static class CallableTimeoutException extends Exception {
42        public CallableTimeoutException(String message, Throwable cause) {
43            super(message, cause);
44        }
45    }
46}

2. Retry-Enabled Callable:

 1public class RetryableCallable<V> implements Callable<V> {
 2
 3    private final Callable<V> delegate;
 4    private final int maxRetries;
 5    private final long retryDelayMs;
 6    private final double backoffMultiplier;
 7    private final Predicate<Exception> retryPredicate;
 8
 9    public RetryableCallable(Callable<V> delegate, int maxRetries) {
10        this(delegate, maxRetries, 1000, 2.0, ex -> true);
11    }
12
13    public RetryableCallable(Callable<V> delegate,
14                           int maxRetries,
15                           long retryDelayMs,
16                           double backoffMultiplier,
17                           Predicate<Exception> retryPredicate) {
18        this.delegate = delegate;
19        this.maxRetries = maxRetries;
20        this.retryDelayMs = retryDelayMs;
21        this.backoffMultiplier = backoffMultiplier;
22        this.retryPredicate = retryPredicate;
23    }
24
25    @Override
26    public V call() throws Exception {
27        Exception lastException = null;
28        long currentDelay = retryDelayMs;
29
30        for (int attempt = 1; attempt <= maxRetries + 1; attempt++) {
31            try {
32                return delegate.call();
33
34            } catch (Exception e) {
35                lastException = e;
36
37                // Don't retry if this is the last attempt or exception is not retryable
38                if (attempt > maxRetries || !retryPredicate.test(e)) {
39                    break;
40                }
41
42                System.out.println("Attempt " + attempt + " failed, retrying in " + currentDelay + "ms: " + e.getMessage());
43
44                try {
45                    Thread.sleep(currentDelay);
46                } catch (InterruptedException ie) {
47                    Thread.currentThread().interrupt();
48                    throw new Exception("Interrupted during retry delay", ie);
49                }
50
51                currentDelay = (long) (currentDelay * backoffMultiplier);
52            }
53        }
54
55        throw new Exception("Failed after " + (maxRetries + 1) + " attempts", lastException);
56    }
57
58    // Factory methods for common retry scenarios
59    public static <V> RetryableCallable<V> withExponentialBackoff(Callable<V> delegate, int maxRetries) {
60        return new RetryableCallable<>(delegate, maxRetries, 100, 2.0, ex -> true);
61    }
62
63    public static <V> RetryableCallable<V> forNetworkOperations(Callable<V> delegate) {
64        return new RetryableCallable<>(
65            delegate,
66            3,
67            500,
68            2.0,
69            ex -> ex instanceof IOException || ex instanceof SocketTimeoutException
70        );
71    }
72}

3. Cached Callable Pattern:

 1public class CachedCallable<V> implements Callable<V> {
 2
 3    private final Callable<V> delegate;
 4    private final long cacheValidityMs;
 5    private final String cacheKey;
 6
 7    private volatile CacheEntry<V> cachedResult;
 8    private final Object cacheLock = new Object();
 9
10    public CachedCallable(Callable<V> delegate, long cacheValidityMs, String cacheKey) {
11        this.delegate = delegate;
12        this.cacheValidityMs = cacheValidityMs;
13        this.cacheKey = cacheKey;
14    }
15
16    @Override
17    public V call() throws Exception {
18        CacheEntry<V> current = cachedResult;
19
20        // Fast path: check if cache is still valid
21        if (current != null && current.isValid()) {
22            return current.value;
23        }
24
25        // Slow path: need to compute new value
26        synchronized (cacheLock) {
27            // Double-check pattern
28            current = cachedResult;
29            if (current != null && current.isValid()) {
30                return current.value;
31            }
32
33            try {
34                V newValue = delegate.call();
35                cachedResult = new CacheEntry<>(newValue, System.currentTimeMillis(), cacheValidityMs);
36                return newValue;
37
38            } catch (Exception e) {
39                // Optionally return stale cache entry on error
40                if (current != null) {
41                    System.err.println("Returning stale cached value due to error: " + e.getMessage());
42                    return current.value;
43                }
44                throw e;
45            }
46        }
47    }
48
49    public void invalidateCache() {
50        synchronized (cacheLock) {
51            cachedResult = null;
52        }
53    }
54
55    public boolean isCacheValid() {
56        CacheEntry<V> current = cachedResult;
57        return current != null && current.isValid();
58    }
59
60    private static class CacheEntry<V> {
61        private final V value;
62        private final long timestamp;
63        private final long validityMs;
64
65        public CacheEntry(V value, long timestamp, long validityMs) {
66            this.value = value;
67            this.timestamp = timestamp;
68            this.validityMs = validityMs;
69        }
70
71        public boolean isValid() {
72            return System.currentTimeMillis() - timestamp < validityMs;
73        }
74    }
75}

๐Ÿง  Java Memory Model and Thread Safety

๐Ÿ” Understanding the Java Memory Model

The Java Memory Model (JMM) defines how threads interact through memory and what behaviors are allowed in concurrent execution.

graph TD
    A[Main Memory] --> B[Thread 1 Local Memory]
    A --> C[Thread 2 Local Memory]

    B --> D[Thread 1 Execution Engine]
    C --> E[Thread 2 Execution Engine]

    F[Synchronization Actions] --> G[volatile reads/writes]
    F --> H[synchronized blocks]
    F --> I[Lock operations]
    F --> J[Atomic operations]

    G --> K[Memory Visibility]
    H --> K
    I --> K
    J --> K

    K --> L[happens-before Relationship]

    style A fill:#ff6b35
    style K fill:#4ecdc4
    style L fill:#feca57

๐Ÿ› ๏ธ Memory Model Practical Examples

1. Volatile Variables and Visibility:

 1public class VolatileExample {
 2
 3    private volatile boolean running = true;
 4    private volatile int counter = 0;
 5
 6    // Non-volatile variables for comparison
 7    private boolean nonVolatileRunning = true;
 8    private int nonVolatileCounter = 0;
 9
10    public void demonstrateVolatileVisibility() throws InterruptedException {
11        // Worker thread that depends on volatile flag
12        Thread worker = new Thread(() -> {
13            int localCounter = 0;
14            while (running) {  // volatile read ensures visibility
15                localCounter++;
16                counter++;     // volatile write ensures visibility
17
18                if (localCounter % 1000000 == 0) {
19                    System.out.println("Worker thread running, counter: " + counter);
20                }
21            }
22            System.out.println("Worker thread stopped, final counter: " + counter);
23        });
24
25        // Controller thread that modifies the flag
26        Thread controller = new Thread(() -> {
27            try {
28                Thread.sleep(2000);
29                System.out.println("Controller stopping worker thread");
30                running = false;  // volatile write ensures visibility
31            } catch (InterruptedException e) {
32                Thread.currentThread().interrupt();
33            }
34        });
35
36        worker.start();
37        controller.start();
38
39        worker.join();
40        controller.join();
41
42        System.out.println("Final counter value: " + counter);
43    }
44
45    // Demonstration of non-volatile behavior (may not terminate!)
46    public void demonstrateNonVolatileBehavior() {
47        Thread worker = new Thread(() -> {
48            while (nonVolatileRunning) {  // Non-volatile read - may not see updates!
49                nonVolatileCounter++;
50            }
51            System.out.println("Non-volatile worker stopped");
52        });
53
54        Thread controller = new Thread(() -> {
55            try {
56                Thread.sleep(1000);
57                nonVolatileRunning = false;  // Non-volatile write - may not be visible!
58                System.out.println("Non-volatile flag set to false");
59            } catch (InterruptedException e) {
60                Thread.currentThread().interrupt();
61            }
62        });
63
64        worker.start();
65        controller.start();
66
67        // Warning: This may run forever due to lack of visibility!
68    }
69}

2. Synchronized Blocks and Memory Synchronization:

  1public class SynchronizedExample {
  2
  3    private int sharedCounter = 0;
  4    private final Object counterLock = new Object();
  5
  6    private final Map<String, Integer> sharedMap = new HashMap<>();
  7    private final ReadWriteLock mapLock = new ReentrantReadWriteLock();
  8
  9    public void demonstrateSynchronizedBlocks() throws InterruptedException {
 10        int threadCount = 10;
 11        int incrementsPerThread = 1000;
 12
 13        List<Thread> threads = new ArrayList<>();
 14
 15        for (int i = 0; i < threadCount; i++) {
 16            Thread t = new Thread(() -> {
 17                for (int j = 0; j < incrementsPerThread; j++) {
 18                    incrementCounterSafely();
 19                }
 20            });
 21            threads.add(t);
 22        }
 23
 24        long startTime = System.nanoTime();
 25
 26        // Start all threads
 27        for (Thread t : threads) {
 28            t.start();
 29        }
 30
 31        // Wait for all threads to complete
 32        for (Thread t : threads) {
 33            t.join();
 34        }
 35
 36        long duration = System.nanoTime() - startTime;
 37
 38        System.out.println("Final counter value: " + sharedCounter);
 39        System.out.println("Expected value: " + (threadCount * incrementsPerThread));
 40        System.out.println("Duration: " + duration / 1_000_000 + " ms");
 41    }
 42
 43    private void incrementCounterSafely() {
 44        synchronized (counterLock) {
 45            // Critical section - only one thread can execute this at a time
 46            int temp = sharedCounter;
 47            temp = temp + 1;
 48            sharedCounter = temp;
 49
 50            // Memory synchronization happens at the end of synchronized block
 51        }
 52        // happens-before relationship established with subsequent synchronized blocks
 53    }
 54
 55    public void demonstrateReadWriteLocks() throws InterruptedException {
 56        int readerCount = 5;
 57        int writerCount = 2;
 58
 59        CountDownLatch startLatch = new CountDownLatch(1);
 60        CountDownLatch completionLatch = new CountDownLatch(readerCount + writerCount);
 61
 62        // Reader threads
 63        for (int i = 0; i < readerCount; i++) {
 64            final int readerId = i;
 65            Thread reader = new Thread(() -> {
 66                try {
 67                    startLatch.await();
 68
 69                    for (int j = 0; j < 10; j++) {
 70                        readFromMap("reader-" + readerId + "-key-" + j);
 71                        Thread.sleep(10);
 72                    }
 73
 74                } catch (InterruptedException e) {
 75                    Thread.currentThread().interrupt();
 76                } finally {
 77                    completionLatch.countDown();
 78                }
 79            });
 80            reader.start();
 81        }
 82
 83        // Writer threads
 84        for (int i = 0; i < writerCount; i++) {
 85            final int writerId = i;
 86            Thread writer = new Thread(() -> {
 87                try {
 88                    startLatch.await();
 89
 90                    for (int j = 0; j < 5; j++) {
 91                        writeToMap("writer-" + writerId + "-key-" + j, j);
 92                        Thread.sleep(50);
 93                    }
 94
 95                } catch (InterruptedException e) {
 96                    Thread.currentThread().interrupt();
 97                } finally {
 98                    completionLatch.countDown();
 99                }
100            });
101            writer.start();
102        }
103
104        startLatch.countDown(); // Start all threads
105        completionLatch.await(); // Wait for completion
106
107        System.out.println("Map size after concurrent operations: " + sharedMap.size());
108    }
109
110    private Integer readFromMap(String key) {
111        mapLock.readLock().lock();
112        try {
113            Integer value = sharedMap.get(key);
114            System.out.println(Thread.currentThread().getName() + " read: " + key + " = " + value);
115            return value;
116        } finally {
117            mapLock.readLock().unlock();
118        }
119    }
120
121    private void writeToMap(String key, Integer value) {
122        mapLock.writeLock().lock();
123        try {
124            sharedMap.put(key, value);
125            System.out.println(Thread.currentThread().getName() + " wrote: " + key + " = " + value);
126        } finally {
127            mapLock.writeLock().unlock();
128        }
129    }
130}

๐Ÿ”„ Advanced Synchronization Patterns

๐Ÿšฆ CountDownLatch Pattern

 1public class CountDownLatchExample {
 2
 3    public void demonstrateServiceStartup() throws InterruptedException {
 4        int serviceCount = 5;
 5        CountDownLatch startupLatch = new CountDownLatch(serviceCount);
 6        CountDownLatch readyLatch = new CountDownLatch(1);
 7
 8        List<ServiceSimulator> services = new ArrayList<>();
 9
10        // Create and start services
11        for (int i = 0; i < serviceCount; i++) {
12            ServiceSimulator service = new ServiceSimulator("Service-" + i, startupLatch, readyLatch);
13            services.add(service);
14            new Thread(service).start();
15        }
16
17        System.out.println("Waiting for all services to start...");
18
19        // Wait for all services to be ready
20        startupLatch.await();
21
22        System.out.println("All services started! Signaling ready state...");
23
24        // Signal all services to begin processing
25        readyLatch.countDown();
26
27        // Wait a bit to see services processing
28        Thread.sleep(3000);
29
30        // Shutdown services
31        services.forEach(ServiceSimulator::shutdown);
32    }
33
34    private static class ServiceSimulator implements Runnable {
35        private final String serviceName;
36        private final CountDownLatch startupLatch;
37        private final CountDownLatch readyLatch;
38        private volatile boolean running = true;
39
40        public ServiceSimulator(String serviceName, CountDownLatch startupLatch, CountDownLatch readyLatch) {
41            this.serviceName = serviceName;
42            this.startupLatch = startupLatch;
43            this.readyLatch = readyLatch;
44        }
45
46        @Override
47        public void run() {
48            try {
49                // Simulate startup time
50                Thread.sleep((long) (Math.random() * 2000) + 500);
51
52                System.out.println(serviceName + " started and ready");
53                startupLatch.countDown();
54
55                // Wait for all services to be ready
56                readyLatch.await();
57
58                // Begin processing
59                System.out.println(serviceName + " beginning processing");
60
61                while (running) {
62                    // Simulate work
63                    Thread.sleep(500);
64                    System.out.println(serviceName + " processing...");
65                }
66
67                System.out.println(serviceName + " shutting down");
68
69            } catch (InterruptedException e) {
70                Thread.currentThread().interrupt();
71                System.out.println(serviceName + " interrupted");
72            }
73        }
74
75        public void shutdown() {
76            running = false;
77        }
78    }
79}

๐Ÿ”„ CyclicBarrier Pattern

 1public class CyclicBarrierExample {
 2
 3    public void demonstrateMultiPhaseComputation() throws InterruptedException, BrokenBarrierException {
 4        int workerCount = 4;
 5        int phaseCount = 3;
 6
 7        CyclicBarrier phaseBarrier = new CyclicBarrier(workerCount, () -> {
 8            System.out.println("Phase completed by all workers!");
 9        });
10
11        CountDownLatch completionLatch = new CountDownLatch(workerCount);
12
13        for (int i = 0; i < workerCount; i++) {
14            final int workerId = i;
15            Thread worker = new Thread(new PhaseWorker(workerId, phaseCount, phaseBarrier, completionLatch));
16            worker.start();
17        }
18
19        completionLatch.await();
20        System.out.println("All workers completed all phases!");
21    }
22
23    private static class PhaseWorker implements Runnable {
24        private final int workerId;
25        private final int phaseCount;
26        private final CyclicBarrier barrier;
27        private final CountDownLatch completionLatch;
28
29        public PhaseWorker(int workerId, int phaseCount, CyclicBarrier barrier, CountDownLatch completionLatch) {
30            this.workerId = workerId;
31            this.phaseCount = phaseCount;
32            this.barrier = barrier;
33            this.completionLatch = completionLatch;
34        }
35
36        @Override
37        public void run() {
38            try {
39                for (int phase = 1; phase <= phaseCount; phase++) {
40                    // Simulate work for this phase
41                    long workTime = (long) (Math.random() * 1000) + 500;
42                    Thread.sleep(workTime);
43
44                    System.out.println("Worker " + workerId + " completed phase " + phase +
45                        " (took " + workTime + "ms)");
46
47                    // Wait for all workers to complete this phase
48                    barrier.await();
49
50                    System.out.println("Worker " + workerId + " proceeding to next phase");
51                }
52
53                System.out.println("Worker " + workerId + " finished all phases");
54
55            } catch (InterruptedException | BrokenBarrierException e) {
56                System.err.println("Worker " + workerId + " interrupted: " + e.getMessage());
57                Thread.currentThread().interrupt();
58            } finally {
59                completionLatch.countDown();
60            }
61        }
62    }
63}

๐ŸŽซ Semaphore Pattern

  1public class SemaphoreExample {
  2
  3    private final Semaphore connectionPool;
  4    private final List<Connection> connections;
  5    private final Queue<Connection> availableConnections;
  6
  7    public SemaphoreExample(int poolSize) {
  8        this.connectionPool = new Semaphore(poolSize, true); // Fair semaphore
  9        this.connections = new ArrayList<>();
 10        this.availableConnections = new ConcurrentLinkedQueue<>();
 11
 12        // Initialize connection pool
 13        for (int i = 0; i < poolSize; i++) {
 14            Connection conn = new MockConnection("Connection-" + i);
 15            connections.add(conn);
 16            availableConnections.offer(conn);
 17        }
 18    }
 19
 20    public Connection acquireConnection() throws InterruptedException {
 21        connectionPool.acquire(); // Wait for available permit
 22
 23        Connection connection = availableConnections.poll();
 24        if (connection == null) {
 25            // This shouldn't happen if semaphore is working correctly
 26            throw new IllegalStateException("No connection available despite acquired permit");
 27        }
 28
 29        System.out.println(Thread.currentThread().getName() + " acquired " + connection.getId());
 30        return connection;
 31    }
 32
 33    public Connection acquireConnectionWithTimeout(long timeoutMs) throws InterruptedException {
 34        boolean acquired = connectionPool.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
 35        if (!acquired) {
 36            throw new RuntimeException("Could not acquire connection within " + timeoutMs + "ms");
 37        }
 38
 39        Connection connection = availableConnections.poll();
 40        if (connection == null) {
 41            connectionPool.release(); // Release permit if no connection available
 42            throw new IllegalStateException("No connection available despite acquired permit");
 43        }
 44
 45        System.out.println(Thread.currentThread().getName() + " acquired " + connection.getId() + " with timeout");
 46        return connection;
 47    }
 48
 49    public void releaseConnection(Connection connection) {
 50        if (connection != null && connections.contains(connection)) {
 51            availableConnections.offer(connection);
 52            connectionPool.release();
 53            System.out.println(Thread.currentThread().getName() + " released " + connection.getId());
 54        }
 55    }
 56
 57    public int getAvailableConnections() {
 58        return connectionPool.availablePermits();
 59    }
 60
 61    public void demonstrateConnectionPool() throws InterruptedException {
 62        int clientCount = 10;
 63        CountDownLatch completionLatch = new CountDownLatch(clientCount);
 64
 65        for (int i = 0; i < clientCount; i++) {
 66            final int clientId = i;
 67            Thread client = new Thread(() -> {
 68                try {
 69                    // Acquire connection
 70                    Connection conn = acquireConnection();
 71
 72                    // Simulate work with connection
 73                    Thread.sleep((long) (Math.random() * 2000) + 500);
 74
 75                    // Release connection
 76                    releaseConnection(conn);
 77
 78                } catch (InterruptedException e) {
 79                    Thread.currentThread().interrupt();
 80                    System.err.println("Client " + clientId + " interrupted");
 81                } finally {
 82                    completionLatch.countDown();
 83                }
 84            }, "Client-" + clientId);
 85
 86            client.start();
 87        }
 88
 89        completionLatch.await();
 90        System.out.println("All clients completed. Available connections: " + getAvailableConnections());
 91    }
 92
 93    // Mock connection class for demonstration
 94    private static class MockConnection implements Connection {
 95        private final String id;
 96
 97        public MockConnection(String id) {
 98            this.id = id;
 99        }
100
101        public String getId() {
102            return id;
103        }
104
105        // Implement other Connection methods as no-ops for demo
106        @Override public Statement createStatement() { return null; }
107        @Override public PreparedStatement prepareStatement(String sql) { return null; }
108        @Override public CallableStatement prepareCall(String sql) { return null; }
109        @Override public String nativeSQL(String sql) { return null; }
110        @Override public void setAutoCommit(boolean autoCommit) {}
111        @Override public boolean getAutoCommit() { return false; }
112        @Override public void commit() {}
113        @Override public void rollback() {}
114        @Override public void close() {}
115        @Override public boolean isClosed() { return false; }
116        // ... other methods omitted for brevity
117    }
118}

๐Ÿงช Testing Concurrent Code

๐Ÿ” Concurrency Testing Strategies

  1@Component
  2public class ConcurrencyTestingExample {
  3
  4    private final AtomicInteger concurrentCounter = new AtomicInteger(0);
  5    private int nonAtomicCounter = 0;
  6    private final Object counterLock = new Object();
  7
  8    /**
  9     * Test race conditions with high contention
 10     */
 11    public void testRaceConditions() throws InterruptedException {
 12        int threadCount = 100;
 13        int incrementsPerThread = 1000;
 14        int expectedTotal = threadCount * incrementsPerThread;
 15
 16        // Test 1: Non-atomic counter (will have race conditions)
 17        testNonAtomicCounter(threadCount, incrementsPerThread, expectedTotal);
 18
 19        // Test 2: Atomic counter (should be correct)
 20        testAtomicCounter(threadCount, incrementsPerThread, expectedTotal);
 21
 22        // Test 3: Synchronized counter (should be correct but slower)
 23        testSynchronizedCounter(threadCount, incrementsPerThread, expectedTotal);
 24    }
 25
 26    private void testNonAtomicCounter(int threadCount, int incrementsPerThread, int expectedTotal)
 27            throws InterruptedException {
 28
 29        nonAtomicCounter = 0;
 30        CountDownLatch startLatch = new CountDownLatch(1);
 31        CountDownLatch completionLatch = new CountDownLatch(threadCount);
 32
 33        List<Thread> threads = new ArrayList<>();
 34
 35        for (int i = 0; i < threadCount; i++) {
 36            Thread t = new Thread(() -> {
 37                try {
 38                    startLatch.await(); // Wait for all threads to be ready
 39
 40                    for (int j = 0; j < incrementsPerThread; j++) {
 41                        nonAtomicCounter++; // Race condition here!
 42                    }
 43
 44                } catch (InterruptedException e) {
 45                    Thread.currentThread().interrupt();
 46                } finally {
 47                    completionLatch.countDown();
 48                }
 49            });
 50            threads.add(t);
 51            t.start();
 52        }
 53
 54        long startTime = System.nanoTime();
 55        startLatch.countDown(); // Start all threads simultaneously
 56        completionLatch.await(); // Wait for completion
 57        long duration = System.nanoTime() - startTime;
 58
 59        System.out.println("Non-atomic counter test:");
 60        System.out.println("  Expected: " + expectedTotal);
 61        System.out.println("  Actual: " + nonAtomicCounter);
 62        System.out.println("  Lost increments: " + (expectedTotal - nonAtomicCounter));
 63        System.out.println("  Duration: " + duration / 1_000_000 + " ms");
 64        System.out.println();
 65    }
 66
 67    private void testAtomicCounter(int threadCount, int incrementsPerThread, int expectedTotal)
 68            throws InterruptedException {
 69
 70        concurrentCounter.set(0);
 71        CountDownLatch startLatch = new CountDownLatch(1);
 72        CountDownLatch completionLatch = new CountDownLatch(threadCount);
 73
 74        List<Thread> threads = new ArrayList<>();
 75
 76        for (int i = 0; i < threadCount; i++) {
 77            Thread t = new Thread(() -> {
 78                try {
 79                    startLatch.await();
 80
 81                    for (int j = 0; j < incrementsPerThread; j++) {
 82                        concurrentCounter.incrementAndGet(); // Thread-safe atomic operation
 83                    }
 84
 85                } catch (InterruptedException e) {
 86                    Thread.currentThread().interrupt();
 87                } finally {
 88                    completionLatch.countDown();
 89                }
 90            });
 91            threads.add(t);
 92            t.start();
 93        }
 94
 95        long startTime = System.nanoTime();
 96        startLatch.countDown();
 97        completionLatch.await();
 98        long duration = System.nanoTime() - startTime;
 99
100        System.out.println("Atomic counter test:");
101        System.out.println("  Expected: " + expectedTotal);
102        System.out.println("  Actual: " + concurrentCounter.get());
103        System.out.println("  Correct: " + (concurrentCounter.get() == expectedTotal));
104        System.out.println("  Duration: " + duration / 1_000_000 + " ms");
105        System.out.println();
106    }
107
108    private void testSynchronizedCounter(int threadCount, int incrementsPerThread, int expectedTotal)
109            throws InterruptedException {
110
111        int synchronizedCounter = 0;
112        CountDownLatch startLatch = new CountDownLatch(1);
113        CountDownLatch completionLatch = new CountDownLatch(threadCount);
114        AtomicInteger syncCounter = new AtomicInteger(0);
115
116        List<Thread> threads = new ArrayList<>();
117
118        for (int i = 0; i < threadCount; i++) {
119            Thread t = new Thread(() -> {
120                try {
121                    startLatch.await();
122
123                    for (int j = 0; j < incrementsPerThread; j++) {
124                        synchronized (counterLock) {
125                            syncCounter.incrementAndGet();
126                        }
127                    }
128
129                } catch (InterruptedException e) {
130                    Thread.currentThread().interrupt();
131                } finally {
132                    completionLatch.countDown();
133                }
134            });
135            threads.add(t);
136            t.start();
137        }
138
139        long startTime = System.nanoTime();
140        startLatch.countDown();
141        completionLatch.await();
142        long duration = System.nanoTime() - startTime;
143
144        System.out.println("Synchronized counter test:");
145        System.out.println("  Expected: " + expectedTotal);
146        System.out.println("  Actual: " + syncCounter.get());
147        System.out.println("  Correct: " + (syncCounter.get() == expectedTotal));
148        System.out.println("  Duration: " + duration / 1_000_000 + " ms");
149        System.out.println();
150    }
151
152    /**
153     * Test for deadlock detection
154     */
155    public void testDeadlockScenario() throws InterruptedException {
156        Object lock1 = new Object();
157        Object lock2 = new Object();
158        CountDownLatch deadlockLatch = new CountDownLatch(2);
159
160        // Thread 1: acquires lock1, then tries to acquire lock2
161        Thread thread1 = new Thread(() -> {
162            synchronized (lock1) {
163                System.out.println("Thread 1: Acquired lock1");
164
165                try {
166                    Thread.sleep(100); // Give thread2 time to acquire lock2
167                } catch (InterruptedException e) {
168                    Thread.currentThread().interrupt();
169                    return;
170                }
171
172                System.out.println("Thread 1: Trying to acquire lock2");
173                synchronized (lock2) {
174                    System.out.println("Thread 1: Acquired lock2");
175                }
176            }
177            deadlockLatch.countDown();
178        });
179
180        // Thread 2: acquires lock2, then tries to acquire lock1
181        Thread thread2 = new Thread(() -> {
182            synchronized (lock2) {
183                System.out.println("Thread 2: Acquired lock2");
184
185                try {
186                    Thread.sleep(100); // Give thread1 time to acquire lock1
187                } catch (InterruptedException e) {
188                    Thread.currentThread().interrupt();
189                    return;
190                }
191
192                System.out.println("Thread 2: Trying to acquire lock1");
193                synchronized (lock1) {
194                    System.out.println("Thread 2: Acquired lock1");
195                }
196            }
197            deadlockLatch.countDown();
198        });
199
200        thread1.start();
201        thread2.start();
202
203        // Wait for potential deadlock with timeout
204        boolean completed = deadlockLatch.await(5, TimeUnit.SECONDS);
205
206        if (!completed) {
207            System.out.println("Deadlock detected! Threads did not complete within timeout.");
208
209            // Interrupt threads to break deadlock
210            thread1.interrupt();
211            thread2.interrupt();
212        } else {
213            System.out.println("No deadlock - both threads completed successfully.");
214        }
215    }
216}

๐Ÿ“Š Performance Analysis Tools

๐Ÿ” Benchmarking Concurrent Operations

  1@Component
  2public class ConcurrencyBenchmark {
  3
  4    private static final int OPERATIONS_COUNT = 1_000_000;
  5
  6    public void benchmarkConcurrencyMechanisms() {
  7        System.out.println("Benchmarking concurrency mechanisms with " + OPERATIONS_COUNT + " operations...\n");
  8
  9        benchmarkAtomicOperations();
 10        benchmarkSynchronizedOperations();
 11        benchmarkLockOperations();
 12        benchmarkVolatileOperations();
 13    }
 14
 15    private void benchmarkAtomicOperations() {
 16        AtomicInteger atomicCounter = new AtomicInteger(0);
 17
 18        long startTime = System.nanoTime();
 19
 20        IntStream.range(0, OPERATIONS_COUNT)
 21            .parallel()
 22            .forEach(i -> atomicCounter.incrementAndGet());
 23
 24        long duration = System.nanoTime() - startTime;
 25
 26        System.out.println("Atomic Operations Benchmark:");
 27        System.out.println("  Operations: " + OPERATIONS_COUNT);
 28        System.out.println("  Final value: " + atomicCounter.get());
 29        System.out.println("  Duration: " + duration / 1_000_000 + " ms");
 30        System.out.println("  Ops/second: " + (OPERATIONS_COUNT * 1_000_000_000L / duration));
 31        System.out.println();
 32    }
 33
 34    private void benchmarkSynchronizedOperations() {
 35        Counter synchronizedCounter = new SynchronizedCounter();
 36
 37        long startTime = System.nanoTime();
 38
 39        IntStream.range(0, OPERATIONS_COUNT)
 40            .parallel()
 41            .forEach(i -> synchronizedCounter.increment());
 42
 43        long duration = System.nanoTime() - startTime;
 44
 45        System.out.println("Synchronized Operations Benchmark:");
 46        System.out.println("  Operations: " + OPERATIONS_COUNT);
 47        System.out.println("  Final value: " + synchronizedCounter.getValue());
 48        System.out.println("  Duration: " + duration / 1_000_000 + " ms");
 49        System.out.println("  Ops/second: " + (OPERATIONS_COUNT * 1_000_000_000L / duration));
 50        System.out.println();
 51    }
 52
 53    private void benchmarkLockOperations() {
 54        Counter lockCounter = new LockCounter();
 55
 56        long startTime = System.nanoTime();
 57
 58        IntStream.range(0, OPERATIONS_COUNT)
 59            .parallel()
 60            .forEach(i -> lockCounter.increment());
 61
 62        long duration = System.nanoTime() - startTime;
 63
 64        System.out.println("Lock Operations Benchmark:");
 65        System.out.println("  Operations: " + OPERATIONS_COUNT);
 66        System.out.println("  Final value: " + lockCounter.getValue());
 67        System.out.println("  Duration: " + duration / 1_000_000 + " ms");
 68        System.out.println("  Ops/second: " + (OPERATIONS_COUNT * 1_000_000_000L / duration));
 69        System.out.println();
 70    }
 71
 72    private void benchmarkVolatileOperations() {
 73        VolatileCounter volatileCounter = new VolatileCounter();
 74
 75        long startTime = System.nanoTime();
 76
 77        // Note: This will have race conditions, but we're testing volatile performance
 78        IntStream.range(0, OPERATIONS_COUNT)
 79            .parallel()
 80            .forEach(i -> volatileCounter.increment());
 81
 82        long duration = System.nanoTime() - startTime;
 83
 84        System.out.println("Volatile Operations Benchmark (with race conditions):");
 85        System.out.println("  Operations: " + OPERATIONS_COUNT);
 86        System.out.println("  Final value: " + volatileCounter.getValue());
 87        System.out.println("  Lost increments: " + (OPERATIONS_COUNT - volatileCounter.getValue()));
 88        System.out.println("  Duration: " + duration / 1_000_000 + " ms");
 89        System.out.println("  Ops/second: " + (OPERATIONS_COUNT * 1_000_000_000L / duration));
 90        System.out.println();
 91    }
 92
 93    interface Counter {
 94        void increment();
 95        int getValue();
 96    }
 97
 98    static class SynchronizedCounter implements Counter {
 99        private int value = 0;
100
101        @Override
102        public synchronized void increment() {
103            value++;
104        }
105
106        @Override
107        public synchronized int getValue() {
108            return value;
109        }
110    }
111
112    static class LockCounter implements Counter {
113        private int value = 0;
114        private final ReentrantLock lock = new ReentrantLock();
115
116        @Override
117        public void increment() {
118            lock.lock();
119            try {
120                value++;
121            } finally {
122                lock.unlock();
123            }
124        }
125
126        @Override
127        public int getValue() {
128            lock.lock();
129            try {
130                return value;
131            } finally {
132                lock.unlock();
133            }
134        }
135    }
136
137    static class VolatileCounter implements Counter {
138        private volatile int value = 0;
139
140        @Override
141        public void increment() {
142            value++; // Race condition! Not atomic despite volatile
143        }
144
145        @Override
146        public int getValue() {
147            return value;
148        }
149    }
150}

๐ŸŽฏ Conclusion and Advanced Patterns Summary

๐Ÿ† Key Takeaways from Deep Dive

This comprehensive deep dive into Java concurrency fundamentals reveals several critical insights:

  1. Runnable Patterns: Beyond simple task execution, advanced patterns like interruptible, parameterized, and stateful runnables provide robust foundations for complex concurrent systems.

  2. Callable Advanced Usage: Timeout-aware, retry-enabled, and cached callables demonstrate how to build resilient distributed systems with proper error handling and performance optimization.

  3. Memory Model Mastery: Understanding the Java Memory Model is crucial for writing correct concurrent code, especially regarding visibility, ordering, and synchronization.

  4. Synchronization Mechanisms: Different synchronization primitives serve different purposes - choose based on contention patterns, performance requirements, and complexity needs.

๐Ÿš€ Best Practices for Advanced Concurrency

graph TD
    A[Advanced Concurrency Principles] --> B[Choose Right Abstraction]
    A --> C[Understand Memory Model]
    A --> D[Test Thoroughly]
    A --> E[Monitor Performance]

    B --> F[Simple Tasks: Runnable]
    B --> G[Return Values: Callable]
    B --> H[Complex Flows: CompletableFuture]

    C --> I[Volatile for Flags]
    C --> J[Synchronized for Mutual Exclusion]
    C --> K[Atomic for Counters]

    D --> L[Race Condition Tests]
    D --> M[Deadlock Detection]
    D --> N[Load Testing]

    E --> O[Thread Pool Metrics]
    E --> P[Contention Analysis]
    E --> Q[Memory Usage]

    style A fill:#ff6b6b
    style C fill:#4ecdc4
    style E fill:#feca57

By mastering these fundamental patterns and understanding their internal mechanisms, you can build highly efficient, scalable, and maintainable concurrent Java applications. Remember that concurrency is not just about making things run in parallel - it’s about designing systems that can safely and efficiently coordinate multiple simultaneous operations while maintaining correctness and performance.

The patterns and techniques covered in this deep dive form the foundation for more advanced concurrency concepts like reactive programming, actor models, and distributed computing frameworks. Master these basics first, and you’ll be well-equipped to tackle any concurrent programming challenge in Java.