๐ฏ Introduction
Building upon our comprehensive overview of Java concurrency, this deep dive explores the fundamental building blocks that power Java’s threading mechanisms. We’ll dissect the internals of Runnable and Callable interfaces, examine thread synchronization primitives, understand the Java Memory Model, and explore advanced patterns that form the foundation of robust concurrent applications.
This technical deep dive is essential for developers who want to understand not just how to use Java’s concurrency tools, but how they work under the hood and how to leverage them effectively in complex scenarios.
๐ฌ Runnable Interface: The Foundation of Java Threading
๐ Understanding Runnable Internals
The Runnable interface is the most fundamental abstraction for representing a task that can be executed by a thread. Let’s examine its internal structure and behavior patterns.
1@FunctionalInterface
2public interface Runnable {
3 /**
4 * When an object implementing interface <code>Runnable</code> is used
5 * to create a thread, starting the thread causes the object's
6 * <code>run</code> method to be called in that separately executing
7 * thread.
8 */
9 public abstract void run();
10}
๐ Deep Dive: Thread Execution Lifecycle
graph TD
A[Thread.start() called] --> B[Thread enters RUNNABLE state]
B --> C[Thread Scheduler assigns CPU]
C --> D[run() method execution begins]
D --> E{Exception thrown?}
E -->|No| F[run() completes normally]
E -->|Yes| G[UncaughtExceptionHandler called]
F --> H[Thread enters TERMINATED state]
G --> H
I[Thread Interruption] --> J{Thread.isInterrupted()}
J -->|true| K[InterruptedException handling]
J -->|false| D
style D fill:#4ecdc4
style H fill:#ff6b35
style K fill:#feca57
๐ ๏ธ Advanced Runnable Patterns
1. Interruptible Runnable Pattern:
1public class InterruptibleTask implements Runnable {
2
3 private final String taskName;
4 private final AtomicBoolean cancelled = new AtomicBoolean(false);
5 private volatile boolean running = false;
6
7 public InterruptibleTask(String taskName) {
8 this.taskName = taskName;
9 }
10
11 @Override
12 public void run() {
13 Thread currentThread = Thread.currentThread();
14 running = true;
15
16 try {
17 while (!cancelled.get() && !currentThread.isInterrupted()) {
18 // Check interruption status periodically
19 if (currentThread.isInterrupted()) {
20 System.out.println(taskName + " was interrupted");
21 break;
22 }
23
24 // Simulate work that can be interrupted
25 performInterruptibleWork();
26
27 // Cooperative interruption check
28 if (Thread.interrupted()) {
29 System.out.println(taskName + " detected interruption flag");
30 break;
31 }
32 }
33 } catch (InterruptedException e) {
34 System.out.println(taskName + " interrupted via exception");
35 // Restore interrupt status
36 currentThread.interrupt();
37 } finally {
38 running = false;
39 cleanup();
40 }
41 }
42
43 private void performInterruptibleWork() throws InterruptedException {
44 for (int i = 0; i < 100; i++) {
45 // Simulate CPU-intensive work
46 Math.sqrt(i * Math.PI);
47
48 // Periodic interruption check for CPU-intensive tasks
49 if (i % 10 == 0 && Thread.currentThread().isInterrupted()) {
50 throw new InterruptedException("Work interrupted at iteration " + i);
51 }
52 }
53
54 // Simulate I/O or blocking operation that respects interruption
55 Thread.sleep(50);
56 }
57
58 private void cleanup() {
59 System.out.println(taskName + " performing cleanup");
60 // Release resources, close connections, etc.
61 }
62
63 public void cancel() {
64 cancelled.set(true);
65 }
66
67 public boolean isRunning() {
68 return running;
69 }
70}
2. Parameterized Runnable Pattern:
1public class ParameterizedRunnable<T> implements Runnable {
2
3 private final T parameter;
4 private final Consumer<T> task;
5 private final BiConsumer<T, Exception> errorHandler;
6 private final Consumer<T> completionHandler;
7
8 public ParameterizedRunnable(T parameter,
9 Consumer<T> task,
10 BiConsumer<T, Exception> errorHandler,
11 Consumer<T> completionHandler) {
12 this.parameter = parameter;
13 this.task = task;
14 this.errorHandler = errorHandler;
15 this.completionHandler = completionHandler;
16 }
17
18 @Override
19 public void run() {
20 String threadName = Thread.currentThread().getName();
21 try {
22 System.out.println("Thread " + threadName + " processing: " + parameter);
23 task.accept(parameter);
24
25 if (completionHandler != null) {
26 completionHandler.accept(parameter);
27 }
28
29 } catch (Exception e) {
30 if (errorHandler != null) {
31 errorHandler.accept(parameter, e);
32 } else {
33 System.err.println("Error in thread " + threadName +
34 " processing " + parameter + ": " + e.getMessage());
35 }
36 }
37 }
38
39 // Factory methods for common patterns
40 public static <T> ParameterizedRunnable<T> withLogging(T parameter, Consumer<T> task) {
41 return new ParameterizedRunnable<>(
42 parameter,
43 task,
44 (param, ex) -> System.err.println("Failed processing " + param + ": " + ex.getMessage()),
45 param -> System.out.println("Completed processing " + param)
46 );
47 }
48
49 public static <T> ParameterizedRunnable<T> withRetry(T parameter,
50 Consumer<T> task,
51 int maxRetries) {
52 return new ParameterizedRunnable<>(
53 parameter,
54 createRetryingTask(task, maxRetries),
55 (param, ex) -> System.err.println("Failed after " + maxRetries + " retries: " + ex.getMessage()),
56 null
57 );
58 }
59
60 private static <T> Consumer<T> createRetryingTask(Consumer<T> originalTask, int maxRetries) {
61 return parameter -> {
62 Exception lastException = null;
63 for (int attempt = 1; attempt <= maxRetries; attempt++) {
64 try {
65 originalTask.accept(parameter);
66 return; // Success
67 } catch (Exception e) {
68 lastException = e;
69 if (attempt < maxRetries) {
70 try {
71 Thread.sleep(100 * attempt); // Exponential backoff
72 } catch (InterruptedException ie) {
73 Thread.currentThread().interrupt();
74 throw new RuntimeException("Interrupted during retry", ie);
75 }
76 }
77 }
78 }
79 throw new RuntimeException("Failed after " + maxRetries + " attempts", lastException);
80 };
81 }
82}
3. State-Aware Runnable Pattern:
1public class StatefulRunnable implements Runnable {
2
3 public enum State {
4 CREATED, RUNNING, PAUSED, CANCELLED, COMPLETED, FAILED
5 }
6
7 private volatile State currentState = State.CREATED;
8 private final Object stateLock = new Object();
9 private final CountDownLatch completionLatch = new CountDownLatch(1);
10
11 private final String taskId;
12 private final Runnable actualTask;
13 private Exception failureException;
14
15 public StatefulRunnable(String taskId, Runnable actualTask) {
16 this.taskId = taskId;
17 this.actualTask = actualTask;
18 }
19
20 @Override
21 public void run() {
22 synchronized (stateLock) {
23 if (currentState != State.CREATED) {
24 throw new IllegalStateException("Task has already been executed");
25 }
26 currentState = State.RUNNING;
27 }
28
29 try {
30 actualTask.run();
31
32 synchronized (stateLock) {
33 if (currentState == State.RUNNING) {
34 currentState = State.COMPLETED;
35 }
36 }
37
38 } catch (Exception e) {
39 synchronized (stateLock) {
40 currentState = State.FAILED;
41 failureException = e;
42 }
43 throw e;
44
45 } finally {
46 completionLatch.countDown();
47 }
48 }
49
50 public State getCurrentState() {
51 synchronized (stateLock) {
52 return currentState;
53 }
54 }
55
56 public void pause() {
57 synchronized (stateLock) {
58 if (currentState == State.RUNNING) {
59 currentState = State.PAUSED;
60 }
61 }
62 // Note: Actual pause implementation would require cooperation from the task
63 }
64
65 public void cancel() {
66 synchronized (stateLock) {
67 if (currentState == State.CREATED || currentState == State.RUNNING || currentState == State.PAUSED) {
68 currentState = State.CANCELLED;
69 }
70 }
71 }
72
73 public boolean waitForCompletion(long timeoutMs) throws InterruptedException {
74 return completionLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
75 }
76
77 public Exception getFailureException() {
78 synchronized (stateLock) {
79 return failureException;
80 }
81 }
82
83 public String getTaskId() {
84 return taskId;
85 }
86}
๐ Callable Interface: Advanced Task Execution
๐ Callable Internals and Patterns
The Callable interface extends the concept of Runnable by adding return values and checked exception handling.
1@FunctionalInterface
2public interface Callable<V> {
3 /**
4 * Computes a result, or throws an exception if unable to do so.
5 */
6 V call() throws Exception;
7}
๐ ๏ธ Advanced Callable Patterns
1. Timeout-Aware Callable:
1public class TimeoutAwareCallable<V> implements Callable<V> {
2
3 private final Callable<V> delegate;
4 private final long timeoutMs;
5 private final V timeoutValue;
6 private final boolean throwOnTimeout;
7
8 public TimeoutAwareCallable(Callable<V> delegate, long timeoutMs, V timeoutValue) {
9 this(delegate, timeoutMs, timeoutValue, false);
10 }
11
12 public TimeoutAwareCallable(Callable<V> delegate, long timeoutMs, boolean throwOnTimeout) {
13 this(delegate, timeoutMs, null, throwOnTimeout);
14 }
15
16 private TimeoutAwareCallable(Callable<V> delegate, long timeoutMs, V timeoutValue, boolean throwOnTimeout) {
17 this.delegate = delegate;
18 this.timeoutMs = timeoutMs;
19 this.timeoutValue = timeoutValue;
20 this.throwOnTimeout = throwOnTimeout;
21 }
22
23 @Override
24 public V call() throws Exception {
25 ExecutorService executor = Executors.newSingleThreadExecutor();
26 try {
27 Future<V> future = executor.submit(delegate);
28 return future.get(timeoutMs, TimeUnit.MILLISECONDS);
29
30 } catch (TimeoutException e) {
31 if (throwOnTimeout) {
32 throw new CallableTimeoutException("Callable timed out after " + timeoutMs + "ms", e);
33 }
34 return timeoutValue;
35
36 } finally {
37 executor.shutdownNow();
38 }
39 }
40
41 public static class CallableTimeoutException extends Exception {
42 public CallableTimeoutException(String message, Throwable cause) {
43 super(message, cause);
44 }
45 }
46}
2. Retry-Enabled Callable:
1public class RetryableCallable<V> implements Callable<V> {
2
3 private final Callable<V> delegate;
4 private final int maxRetries;
5 private final long retryDelayMs;
6 private final double backoffMultiplier;
7 private final Predicate<Exception> retryPredicate;
8
9 public RetryableCallable(Callable<V> delegate, int maxRetries) {
10 this(delegate, maxRetries, 1000, 2.0, ex -> true);
11 }
12
13 public RetryableCallable(Callable<V> delegate,
14 int maxRetries,
15 long retryDelayMs,
16 double backoffMultiplier,
17 Predicate<Exception> retryPredicate) {
18 this.delegate = delegate;
19 this.maxRetries = maxRetries;
20 this.retryDelayMs = retryDelayMs;
21 this.backoffMultiplier = backoffMultiplier;
22 this.retryPredicate = retryPredicate;
23 }
24
25 @Override
26 public V call() throws Exception {
27 Exception lastException = null;
28 long currentDelay = retryDelayMs;
29
30 for (int attempt = 1; attempt <= maxRetries + 1; attempt++) {
31 try {
32 return delegate.call();
33
34 } catch (Exception e) {
35 lastException = e;
36
37 // Don't retry if this is the last attempt or exception is not retryable
38 if (attempt > maxRetries || !retryPredicate.test(e)) {
39 break;
40 }
41
42 System.out.println("Attempt " + attempt + " failed, retrying in " + currentDelay + "ms: " + e.getMessage());
43
44 try {
45 Thread.sleep(currentDelay);
46 } catch (InterruptedException ie) {
47 Thread.currentThread().interrupt();
48 throw new Exception("Interrupted during retry delay", ie);
49 }
50
51 currentDelay = (long) (currentDelay * backoffMultiplier);
52 }
53 }
54
55 throw new Exception("Failed after " + (maxRetries + 1) + " attempts", lastException);
56 }
57
58 // Factory methods for common retry scenarios
59 public static <V> RetryableCallable<V> withExponentialBackoff(Callable<V> delegate, int maxRetries) {
60 return new RetryableCallable<>(delegate, maxRetries, 100, 2.0, ex -> true);
61 }
62
63 public static <V> RetryableCallable<V> forNetworkOperations(Callable<V> delegate) {
64 return new RetryableCallable<>(
65 delegate,
66 3,
67 500,
68 2.0,
69 ex -> ex instanceof IOException || ex instanceof SocketTimeoutException
70 );
71 }
72}
3. Cached Callable Pattern:
1public class CachedCallable<V> implements Callable<V> {
2
3 private final Callable<V> delegate;
4 private final long cacheValidityMs;
5 private final String cacheKey;
6
7 private volatile CacheEntry<V> cachedResult;
8 private final Object cacheLock = new Object();
9
10 public CachedCallable(Callable<V> delegate, long cacheValidityMs, String cacheKey) {
11 this.delegate = delegate;
12 this.cacheValidityMs = cacheValidityMs;
13 this.cacheKey = cacheKey;
14 }
15
16 @Override
17 public V call() throws Exception {
18 CacheEntry<V> current = cachedResult;
19
20 // Fast path: check if cache is still valid
21 if (current != null && current.isValid()) {
22 return current.value;
23 }
24
25 // Slow path: need to compute new value
26 synchronized (cacheLock) {
27 // Double-check pattern
28 current = cachedResult;
29 if (current != null && current.isValid()) {
30 return current.value;
31 }
32
33 try {
34 V newValue = delegate.call();
35 cachedResult = new CacheEntry<>(newValue, System.currentTimeMillis(), cacheValidityMs);
36 return newValue;
37
38 } catch (Exception e) {
39 // Optionally return stale cache entry on error
40 if (current != null) {
41 System.err.println("Returning stale cached value due to error: " + e.getMessage());
42 return current.value;
43 }
44 throw e;
45 }
46 }
47 }
48
49 public void invalidateCache() {
50 synchronized (cacheLock) {
51 cachedResult = null;
52 }
53 }
54
55 public boolean isCacheValid() {
56 CacheEntry<V> current = cachedResult;
57 return current != null && current.isValid();
58 }
59
60 private static class CacheEntry<V> {
61 private final V value;
62 private final long timestamp;
63 private final long validityMs;
64
65 public CacheEntry(V value, long timestamp, long validityMs) {
66 this.value = value;
67 this.timestamp = timestamp;
68 this.validityMs = validityMs;
69 }
70
71 public boolean isValid() {
72 return System.currentTimeMillis() - timestamp < validityMs;
73 }
74 }
75}
๐ง Java Memory Model and Thread Safety
๐ Understanding the Java Memory Model
The Java Memory Model (JMM) defines how threads interact through memory and what behaviors are allowed in concurrent execution.
graph TD
A[Main Memory] --> B[Thread 1 Local Memory]
A --> C[Thread 2 Local Memory]
B --> D[Thread 1 Execution Engine]
C --> E[Thread 2 Execution Engine]
F[Synchronization Actions] --> G[volatile reads/writes]
F --> H[synchronized blocks]
F --> I[Lock operations]
F --> J[Atomic operations]
G --> K[Memory Visibility]
H --> K
I --> K
J --> K
K --> L[happens-before Relationship]
style A fill:#ff6b35
style K fill:#4ecdc4
style L fill:#feca57
๐ ๏ธ Memory Model Practical Examples
1. Volatile Variables and Visibility:
1public class VolatileExample {
2
3 private volatile boolean running = true;
4 private volatile int counter = 0;
5
6 // Non-volatile variables for comparison
7 private boolean nonVolatileRunning = true;
8 private int nonVolatileCounter = 0;
9
10 public void demonstrateVolatileVisibility() throws InterruptedException {
11 // Worker thread that depends on volatile flag
12 Thread worker = new Thread(() -> {
13 int localCounter = 0;
14 while (running) { // volatile read ensures visibility
15 localCounter++;
16 counter++; // volatile write ensures visibility
17
18 if (localCounter % 1000000 == 0) {
19 System.out.println("Worker thread running, counter: " + counter);
20 }
21 }
22 System.out.println("Worker thread stopped, final counter: " + counter);
23 });
24
25 // Controller thread that modifies the flag
26 Thread controller = new Thread(() -> {
27 try {
28 Thread.sleep(2000);
29 System.out.println("Controller stopping worker thread");
30 running = false; // volatile write ensures visibility
31 } catch (InterruptedException e) {
32 Thread.currentThread().interrupt();
33 }
34 });
35
36 worker.start();
37 controller.start();
38
39 worker.join();
40 controller.join();
41
42 System.out.println("Final counter value: " + counter);
43 }
44
45 // Demonstration of non-volatile behavior (may not terminate!)
46 public void demonstrateNonVolatileBehavior() {
47 Thread worker = new Thread(() -> {
48 while (nonVolatileRunning) { // Non-volatile read - may not see updates!
49 nonVolatileCounter++;
50 }
51 System.out.println("Non-volatile worker stopped");
52 });
53
54 Thread controller = new Thread(() -> {
55 try {
56 Thread.sleep(1000);
57 nonVolatileRunning = false; // Non-volatile write - may not be visible!
58 System.out.println("Non-volatile flag set to false");
59 } catch (InterruptedException e) {
60 Thread.currentThread().interrupt();
61 }
62 });
63
64 worker.start();
65 controller.start();
66
67 // Warning: This may run forever due to lack of visibility!
68 }
69}
2. Synchronized Blocks and Memory Synchronization:
1public class SynchronizedExample {
2
3 private int sharedCounter = 0;
4 private final Object counterLock = new Object();
5
6 private final Map<String, Integer> sharedMap = new HashMap<>();
7 private final ReadWriteLock mapLock = new ReentrantReadWriteLock();
8
9 public void demonstrateSynchronizedBlocks() throws InterruptedException {
10 int threadCount = 10;
11 int incrementsPerThread = 1000;
12
13 List<Thread> threads = new ArrayList<>();
14
15 for (int i = 0; i < threadCount; i++) {
16 Thread t = new Thread(() -> {
17 for (int j = 0; j < incrementsPerThread; j++) {
18 incrementCounterSafely();
19 }
20 });
21 threads.add(t);
22 }
23
24 long startTime = System.nanoTime();
25
26 // Start all threads
27 for (Thread t : threads) {
28 t.start();
29 }
30
31 // Wait for all threads to complete
32 for (Thread t : threads) {
33 t.join();
34 }
35
36 long duration = System.nanoTime() - startTime;
37
38 System.out.println("Final counter value: " + sharedCounter);
39 System.out.println("Expected value: " + (threadCount * incrementsPerThread));
40 System.out.println("Duration: " + duration / 1_000_000 + " ms");
41 }
42
43 private void incrementCounterSafely() {
44 synchronized (counterLock) {
45 // Critical section - only one thread can execute this at a time
46 int temp = sharedCounter;
47 temp = temp + 1;
48 sharedCounter = temp;
49
50 // Memory synchronization happens at the end of synchronized block
51 }
52 // happens-before relationship established with subsequent synchronized blocks
53 }
54
55 public void demonstrateReadWriteLocks() throws InterruptedException {
56 int readerCount = 5;
57 int writerCount = 2;
58
59 CountDownLatch startLatch = new CountDownLatch(1);
60 CountDownLatch completionLatch = new CountDownLatch(readerCount + writerCount);
61
62 // Reader threads
63 for (int i = 0; i < readerCount; i++) {
64 final int readerId = i;
65 Thread reader = new Thread(() -> {
66 try {
67 startLatch.await();
68
69 for (int j = 0; j < 10; j++) {
70 readFromMap("reader-" + readerId + "-key-" + j);
71 Thread.sleep(10);
72 }
73
74 } catch (InterruptedException e) {
75 Thread.currentThread().interrupt();
76 } finally {
77 completionLatch.countDown();
78 }
79 });
80 reader.start();
81 }
82
83 // Writer threads
84 for (int i = 0; i < writerCount; i++) {
85 final int writerId = i;
86 Thread writer = new Thread(() -> {
87 try {
88 startLatch.await();
89
90 for (int j = 0; j < 5; j++) {
91 writeToMap("writer-" + writerId + "-key-" + j, j);
92 Thread.sleep(50);
93 }
94
95 } catch (InterruptedException e) {
96 Thread.currentThread().interrupt();
97 } finally {
98 completionLatch.countDown();
99 }
100 });
101 writer.start();
102 }
103
104 startLatch.countDown(); // Start all threads
105 completionLatch.await(); // Wait for completion
106
107 System.out.println("Map size after concurrent operations: " + sharedMap.size());
108 }
109
110 private Integer readFromMap(String key) {
111 mapLock.readLock().lock();
112 try {
113 Integer value = sharedMap.get(key);
114 System.out.println(Thread.currentThread().getName() + " read: " + key + " = " + value);
115 return value;
116 } finally {
117 mapLock.readLock().unlock();
118 }
119 }
120
121 private void writeToMap(String key, Integer value) {
122 mapLock.writeLock().lock();
123 try {
124 sharedMap.put(key, value);
125 System.out.println(Thread.currentThread().getName() + " wrote: " + key + " = " + value);
126 } finally {
127 mapLock.writeLock().unlock();
128 }
129 }
130}
๐ Advanced Synchronization Patterns
๐ฆ CountDownLatch Pattern
1public class CountDownLatchExample {
2
3 public void demonstrateServiceStartup() throws InterruptedException {
4 int serviceCount = 5;
5 CountDownLatch startupLatch = new CountDownLatch(serviceCount);
6 CountDownLatch readyLatch = new CountDownLatch(1);
7
8 List<ServiceSimulator> services = new ArrayList<>();
9
10 // Create and start services
11 for (int i = 0; i < serviceCount; i++) {
12 ServiceSimulator service = new ServiceSimulator("Service-" + i, startupLatch, readyLatch);
13 services.add(service);
14 new Thread(service).start();
15 }
16
17 System.out.println("Waiting for all services to start...");
18
19 // Wait for all services to be ready
20 startupLatch.await();
21
22 System.out.println("All services started! Signaling ready state...");
23
24 // Signal all services to begin processing
25 readyLatch.countDown();
26
27 // Wait a bit to see services processing
28 Thread.sleep(3000);
29
30 // Shutdown services
31 services.forEach(ServiceSimulator::shutdown);
32 }
33
34 private static class ServiceSimulator implements Runnable {
35 private final String serviceName;
36 private final CountDownLatch startupLatch;
37 private final CountDownLatch readyLatch;
38 private volatile boolean running = true;
39
40 public ServiceSimulator(String serviceName, CountDownLatch startupLatch, CountDownLatch readyLatch) {
41 this.serviceName = serviceName;
42 this.startupLatch = startupLatch;
43 this.readyLatch = readyLatch;
44 }
45
46 @Override
47 public void run() {
48 try {
49 // Simulate startup time
50 Thread.sleep((long) (Math.random() * 2000) + 500);
51
52 System.out.println(serviceName + " started and ready");
53 startupLatch.countDown();
54
55 // Wait for all services to be ready
56 readyLatch.await();
57
58 // Begin processing
59 System.out.println(serviceName + " beginning processing");
60
61 while (running) {
62 // Simulate work
63 Thread.sleep(500);
64 System.out.println(serviceName + " processing...");
65 }
66
67 System.out.println(serviceName + " shutting down");
68
69 } catch (InterruptedException e) {
70 Thread.currentThread().interrupt();
71 System.out.println(serviceName + " interrupted");
72 }
73 }
74
75 public void shutdown() {
76 running = false;
77 }
78 }
79}
๐ CyclicBarrier Pattern
1public class CyclicBarrierExample {
2
3 public void demonstrateMultiPhaseComputation() throws InterruptedException, BrokenBarrierException {
4 int workerCount = 4;
5 int phaseCount = 3;
6
7 CyclicBarrier phaseBarrier = new CyclicBarrier(workerCount, () -> {
8 System.out.println("Phase completed by all workers!");
9 });
10
11 CountDownLatch completionLatch = new CountDownLatch(workerCount);
12
13 for (int i = 0; i < workerCount; i++) {
14 final int workerId = i;
15 Thread worker = new Thread(new PhaseWorker(workerId, phaseCount, phaseBarrier, completionLatch));
16 worker.start();
17 }
18
19 completionLatch.await();
20 System.out.println("All workers completed all phases!");
21 }
22
23 private static class PhaseWorker implements Runnable {
24 private final int workerId;
25 private final int phaseCount;
26 private final CyclicBarrier barrier;
27 private final CountDownLatch completionLatch;
28
29 public PhaseWorker(int workerId, int phaseCount, CyclicBarrier barrier, CountDownLatch completionLatch) {
30 this.workerId = workerId;
31 this.phaseCount = phaseCount;
32 this.barrier = barrier;
33 this.completionLatch = completionLatch;
34 }
35
36 @Override
37 public void run() {
38 try {
39 for (int phase = 1; phase <= phaseCount; phase++) {
40 // Simulate work for this phase
41 long workTime = (long) (Math.random() * 1000) + 500;
42 Thread.sleep(workTime);
43
44 System.out.println("Worker " + workerId + " completed phase " + phase +
45 " (took " + workTime + "ms)");
46
47 // Wait for all workers to complete this phase
48 barrier.await();
49
50 System.out.println("Worker " + workerId + " proceeding to next phase");
51 }
52
53 System.out.println("Worker " + workerId + " finished all phases");
54
55 } catch (InterruptedException | BrokenBarrierException e) {
56 System.err.println("Worker " + workerId + " interrupted: " + e.getMessage());
57 Thread.currentThread().interrupt();
58 } finally {
59 completionLatch.countDown();
60 }
61 }
62 }
63}
๐ซ Semaphore Pattern
1public class SemaphoreExample {
2
3 private final Semaphore connectionPool;
4 private final List<Connection> connections;
5 private final Queue<Connection> availableConnections;
6
7 public SemaphoreExample(int poolSize) {
8 this.connectionPool = new Semaphore(poolSize, true); // Fair semaphore
9 this.connections = new ArrayList<>();
10 this.availableConnections = new ConcurrentLinkedQueue<>();
11
12 // Initialize connection pool
13 for (int i = 0; i < poolSize; i++) {
14 Connection conn = new MockConnection("Connection-" + i);
15 connections.add(conn);
16 availableConnections.offer(conn);
17 }
18 }
19
20 public Connection acquireConnection() throws InterruptedException {
21 connectionPool.acquire(); // Wait for available permit
22
23 Connection connection = availableConnections.poll();
24 if (connection == null) {
25 // This shouldn't happen if semaphore is working correctly
26 throw new IllegalStateException("No connection available despite acquired permit");
27 }
28
29 System.out.println(Thread.currentThread().getName() + " acquired " + connection.getId());
30 return connection;
31 }
32
33 public Connection acquireConnectionWithTimeout(long timeoutMs) throws InterruptedException {
34 boolean acquired = connectionPool.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
35 if (!acquired) {
36 throw new RuntimeException("Could not acquire connection within " + timeoutMs + "ms");
37 }
38
39 Connection connection = availableConnections.poll();
40 if (connection == null) {
41 connectionPool.release(); // Release permit if no connection available
42 throw new IllegalStateException("No connection available despite acquired permit");
43 }
44
45 System.out.println(Thread.currentThread().getName() + " acquired " + connection.getId() + " with timeout");
46 return connection;
47 }
48
49 public void releaseConnection(Connection connection) {
50 if (connection != null && connections.contains(connection)) {
51 availableConnections.offer(connection);
52 connectionPool.release();
53 System.out.println(Thread.currentThread().getName() + " released " + connection.getId());
54 }
55 }
56
57 public int getAvailableConnections() {
58 return connectionPool.availablePermits();
59 }
60
61 public void demonstrateConnectionPool() throws InterruptedException {
62 int clientCount = 10;
63 CountDownLatch completionLatch = new CountDownLatch(clientCount);
64
65 for (int i = 0; i < clientCount; i++) {
66 final int clientId = i;
67 Thread client = new Thread(() -> {
68 try {
69 // Acquire connection
70 Connection conn = acquireConnection();
71
72 // Simulate work with connection
73 Thread.sleep((long) (Math.random() * 2000) + 500);
74
75 // Release connection
76 releaseConnection(conn);
77
78 } catch (InterruptedException e) {
79 Thread.currentThread().interrupt();
80 System.err.println("Client " + clientId + " interrupted");
81 } finally {
82 completionLatch.countDown();
83 }
84 }, "Client-" + clientId);
85
86 client.start();
87 }
88
89 completionLatch.await();
90 System.out.println("All clients completed. Available connections: " + getAvailableConnections());
91 }
92
93 // Mock connection class for demonstration
94 private static class MockConnection implements Connection {
95 private final String id;
96
97 public MockConnection(String id) {
98 this.id = id;
99 }
100
101 public String getId() {
102 return id;
103 }
104
105 // Implement other Connection methods as no-ops for demo
106 @Override public Statement createStatement() { return null; }
107 @Override public PreparedStatement prepareStatement(String sql) { return null; }
108 @Override public CallableStatement prepareCall(String sql) { return null; }
109 @Override public String nativeSQL(String sql) { return null; }
110 @Override public void setAutoCommit(boolean autoCommit) {}
111 @Override public boolean getAutoCommit() { return false; }
112 @Override public void commit() {}
113 @Override public void rollback() {}
114 @Override public void close() {}
115 @Override public boolean isClosed() { return false; }
116 // ... other methods omitted for brevity
117 }
118}
๐งช Testing Concurrent Code
๐ Concurrency Testing Strategies
1@Component
2public class ConcurrencyTestingExample {
3
4 private final AtomicInteger concurrentCounter = new AtomicInteger(0);
5 private int nonAtomicCounter = 0;
6 private final Object counterLock = new Object();
7
8 /**
9 * Test race conditions with high contention
10 */
11 public void testRaceConditions() throws InterruptedException {
12 int threadCount = 100;
13 int incrementsPerThread = 1000;
14 int expectedTotal = threadCount * incrementsPerThread;
15
16 // Test 1: Non-atomic counter (will have race conditions)
17 testNonAtomicCounter(threadCount, incrementsPerThread, expectedTotal);
18
19 // Test 2: Atomic counter (should be correct)
20 testAtomicCounter(threadCount, incrementsPerThread, expectedTotal);
21
22 // Test 3: Synchronized counter (should be correct but slower)
23 testSynchronizedCounter(threadCount, incrementsPerThread, expectedTotal);
24 }
25
26 private void testNonAtomicCounter(int threadCount, int incrementsPerThread, int expectedTotal)
27 throws InterruptedException {
28
29 nonAtomicCounter = 0;
30 CountDownLatch startLatch = new CountDownLatch(1);
31 CountDownLatch completionLatch = new CountDownLatch(threadCount);
32
33 List<Thread> threads = new ArrayList<>();
34
35 for (int i = 0; i < threadCount; i++) {
36 Thread t = new Thread(() -> {
37 try {
38 startLatch.await(); // Wait for all threads to be ready
39
40 for (int j = 0; j < incrementsPerThread; j++) {
41 nonAtomicCounter++; // Race condition here!
42 }
43
44 } catch (InterruptedException e) {
45 Thread.currentThread().interrupt();
46 } finally {
47 completionLatch.countDown();
48 }
49 });
50 threads.add(t);
51 t.start();
52 }
53
54 long startTime = System.nanoTime();
55 startLatch.countDown(); // Start all threads simultaneously
56 completionLatch.await(); // Wait for completion
57 long duration = System.nanoTime() - startTime;
58
59 System.out.println("Non-atomic counter test:");
60 System.out.println(" Expected: " + expectedTotal);
61 System.out.println(" Actual: " + nonAtomicCounter);
62 System.out.println(" Lost increments: " + (expectedTotal - nonAtomicCounter));
63 System.out.println(" Duration: " + duration / 1_000_000 + " ms");
64 System.out.println();
65 }
66
67 private void testAtomicCounter(int threadCount, int incrementsPerThread, int expectedTotal)
68 throws InterruptedException {
69
70 concurrentCounter.set(0);
71 CountDownLatch startLatch = new CountDownLatch(1);
72 CountDownLatch completionLatch = new CountDownLatch(threadCount);
73
74 List<Thread> threads = new ArrayList<>();
75
76 for (int i = 0; i < threadCount; i++) {
77 Thread t = new Thread(() -> {
78 try {
79 startLatch.await();
80
81 for (int j = 0; j < incrementsPerThread; j++) {
82 concurrentCounter.incrementAndGet(); // Thread-safe atomic operation
83 }
84
85 } catch (InterruptedException e) {
86 Thread.currentThread().interrupt();
87 } finally {
88 completionLatch.countDown();
89 }
90 });
91 threads.add(t);
92 t.start();
93 }
94
95 long startTime = System.nanoTime();
96 startLatch.countDown();
97 completionLatch.await();
98 long duration = System.nanoTime() - startTime;
99
100 System.out.println("Atomic counter test:");
101 System.out.println(" Expected: " + expectedTotal);
102 System.out.println(" Actual: " + concurrentCounter.get());
103 System.out.println(" Correct: " + (concurrentCounter.get() == expectedTotal));
104 System.out.println(" Duration: " + duration / 1_000_000 + " ms");
105 System.out.println();
106 }
107
108 private void testSynchronizedCounter(int threadCount, int incrementsPerThread, int expectedTotal)
109 throws InterruptedException {
110
111 int synchronizedCounter = 0;
112 CountDownLatch startLatch = new CountDownLatch(1);
113 CountDownLatch completionLatch = new CountDownLatch(threadCount);
114 AtomicInteger syncCounter = new AtomicInteger(0);
115
116 List<Thread> threads = new ArrayList<>();
117
118 for (int i = 0; i < threadCount; i++) {
119 Thread t = new Thread(() -> {
120 try {
121 startLatch.await();
122
123 for (int j = 0; j < incrementsPerThread; j++) {
124 synchronized (counterLock) {
125 syncCounter.incrementAndGet();
126 }
127 }
128
129 } catch (InterruptedException e) {
130 Thread.currentThread().interrupt();
131 } finally {
132 completionLatch.countDown();
133 }
134 });
135 threads.add(t);
136 t.start();
137 }
138
139 long startTime = System.nanoTime();
140 startLatch.countDown();
141 completionLatch.await();
142 long duration = System.nanoTime() - startTime;
143
144 System.out.println("Synchronized counter test:");
145 System.out.println(" Expected: " + expectedTotal);
146 System.out.println(" Actual: " + syncCounter.get());
147 System.out.println(" Correct: " + (syncCounter.get() == expectedTotal));
148 System.out.println(" Duration: " + duration / 1_000_000 + " ms");
149 System.out.println();
150 }
151
152 /**
153 * Test for deadlock detection
154 */
155 public void testDeadlockScenario() throws InterruptedException {
156 Object lock1 = new Object();
157 Object lock2 = new Object();
158 CountDownLatch deadlockLatch = new CountDownLatch(2);
159
160 // Thread 1: acquires lock1, then tries to acquire lock2
161 Thread thread1 = new Thread(() -> {
162 synchronized (lock1) {
163 System.out.println("Thread 1: Acquired lock1");
164
165 try {
166 Thread.sleep(100); // Give thread2 time to acquire lock2
167 } catch (InterruptedException e) {
168 Thread.currentThread().interrupt();
169 return;
170 }
171
172 System.out.println("Thread 1: Trying to acquire lock2");
173 synchronized (lock2) {
174 System.out.println("Thread 1: Acquired lock2");
175 }
176 }
177 deadlockLatch.countDown();
178 });
179
180 // Thread 2: acquires lock2, then tries to acquire lock1
181 Thread thread2 = new Thread(() -> {
182 synchronized (lock2) {
183 System.out.println("Thread 2: Acquired lock2");
184
185 try {
186 Thread.sleep(100); // Give thread1 time to acquire lock1
187 } catch (InterruptedException e) {
188 Thread.currentThread().interrupt();
189 return;
190 }
191
192 System.out.println("Thread 2: Trying to acquire lock1");
193 synchronized (lock1) {
194 System.out.println("Thread 2: Acquired lock1");
195 }
196 }
197 deadlockLatch.countDown();
198 });
199
200 thread1.start();
201 thread2.start();
202
203 // Wait for potential deadlock with timeout
204 boolean completed = deadlockLatch.await(5, TimeUnit.SECONDS);
205
206 if (!completed) {
207 System.out.println("Deadlock detected! Threads did not complete within timeout.");
208
209 // Interrupt threads to break deadlock
210 thread1.interrupt();
211 thread2.interrupt();
212 } else {
213 System.out.println("No deadlock - both threads completed successfully.");
214 }
215 }
216}
๐ Performance Analysis Tools
๐ Benchmarking Concurrent Operations
1@Component
2public class ConcurrencyBenchmark {
3
4 private static final int OPERATIONS_COUNT = 1_000_000;
5
6 public void benchmarkConcurrencyMechanisms() {
7 System.out.println("Benchmarking concurrency mechanisms with " + OPERATIONS_COUNT + " operations...\n");
8
9 benchmarkAtomicOperations();
10 benchmarkSynchronizedOperations();
11 benchmarkLockOperations();
12 benchmarkVolatileOperations();
13 }
14
15 private void benchmarkAtomicOperations() {
16 AtomicInteger atomicCounter = new AtomicInteger(0);
17
18 long startTime = System.nanoTime();
19
20 IntStream.range(0, OPERATIONS_COUNT)
21 .parallel()
22 .forEach(i -> atomicCounter.incrementAndGet());
23
24 long duration = System.nanoTime() - startTime;
25
26 System.out.println("Atomic Operations Benchmark:");
27 System.out.println(" Operations: " + OPERATIONS_COUNT);
28 System.out.println(" Final value: " + atomicCounter.get());
29 System.out.println(" Duration: " + duration / 1_000_000 + " ms");
30 System.out.println(" Ops/second: " + (OPERATIONS_COUNT * 1_000_000_000L / duration));
31 System.out.println();
32 }
33
34 private void benchmarkSynchronizedOperations() {
35 Counter synchronizedCounter = new SynchronizedCounter();
36
37 long startTime = System.nanoTime();
38
39 IntStream.range(0, OPERATIONS_COUNT)
40 .parallel()
41 .forEach(i -> synchronizedCounter.increment());
42
43 long duration = System.nanoTime() - startTime;
44
45 System.out.println("Synchronized Operations Benchmark:");
46 System.out.println(" Operations: " + OPERATIONS_COUNT);
47 System.out.println(" Final value: " + synchronizedCounter.getValue());
48 System.out.println(" Duration: " + duration / 1_000_000 + " ms");
49 System.out.println(" Ops/second: " + (OPERATIONS_COUNT * 1_000_000_000L / duration));
50 System.out.println();
51 }
52
53 private void benchmarkLockOperations() {
54 Counter lockCounter = new LockCounter();
55
56 long startTime = System.nanoTime();
57
58 IntStream.range(0, OPERATIONS_COUNT)
59 .parallel()
60 .forEach(i -> lockCounter.increment());
61
62 long duration = System.nanoTime() - startTime;
63
64 System.out.println("Lock Operations Benchmark:");
65 System.out.println(" Operations: " + OPERATIONS_COUNT);
66 System.out.println(" Final value: " + lockCounter.getValue());
67 System.out.println(" Duration: " + duration / 1_000_000 + " ms");
68 System.out.println(" Ops/second: " + (OPERATIONS_COUNT * 1_000_000_000L / duration));
69 System.out.println();
70 }
71
72 private void benchmarkVolatileOperations() {
73 VolatileCounter volatileCounter = new VolatileCounter();
74
75 long startTime = System.nanoTime();
76
77 // Note: This will have race conditions, but we're testing volatile performance
78 IntStream.range(0, OPERATIONS_COUNT)
79 .parallel()
80 .forEach(i -> volatileCounter.increment());
81
82 long duration = System.nanoTime() - startTime;
83
84 System.out.println("Volatile Operations Benchmark (with race conditions):");
85 System.out.println(" Operations: " + OPERATIONS_COUNT);
86 System.out.println(" Final value: " + volatileCounter.getValue());
87 System.out.println(" Lost increments: " + (OPERATIONS_COUNT - volatileCounter.getValue()));
88 System.out.println(" Duration: " + duration / 1_000_000 + " ms");
89 System.out.println(" Ops/second: " + (OPERATIONS_COUNT * 1_000_000_000L / duration));
90 System.out.println();
91 }
92
93 interface Counter {
94 void increment();
95 int getValue();
96 }
97
98 static class SynchronizedCounter implements Counter {
99 private int value = 0;
100
101 @Override
102 public synchronized void increment() {
103 value++;
104 }
105
106 @Override
107 public synchronized int getValue() {
108 return value;
109 }
110 }
111
112 static class LockCounter implements Counter {
113 private int value = 0;
114 private final ReentrantLock lock = new ReentrantLock();
115
116 @Override
117 public void increment() {
118 lock.lock();
119 try {
120 value++;
121 } finally {
122 lock.unlock();
123 }
124 }
125
126 @Override
127 public int getValue() {
128 lock.lock();
129 try {
130 return value;
131 } finally {
132 lock.unlock();
133 }
134 }
135 }
136
137 static class VolatileCounter implements Counter {
138 private volatile int value = 0;
139
140 @Override
141 public void increment() {
142 value++; // Race condition! Not atomic despite volatile
143 }
144
145 @Override
146 public int getValue() {
147 return value;
148 }
149 }
150}
๐ฏ Conclusion and Advanced Patterns Summary
๐ Key Takeaways from Deep Dive
This comprehensive deep dive into Java concurrency fundamentals reveals several critical insights:
Runnable Patterns: Beyond simple task execution, advanced patterns like interruptible, parameterized, and stateful runnables provide robust foundations for complex concurrent systems.
Callable Advanced Usage: Timeout-aware, retry-enabled, and cached callables demonstrate how to build resilient distributed systems with proper error handling and performance optimization.
Memory Model Mastery: Understanding the Java Memory Model is crucial for writing correct concurrent code, especially regarding visibility, ordering, and synchronization.
Synchronization Mechanisms: Different synchronization primitives serve different purposes - choose based on contention patterns, performance requirements, and complexity needs.
๐ Best Practices for Advanced Concurrency
graph TD
A[Advanced Concurrency Principles] --> B[Choose Right Abstraction]
A --> C[Understand Memory Model]
A --> D[Test Thoroughly]
A --> E[Monitor Performance]
B --> F[Simple Tasks: Runnable]
B --> G[Return Values: Callable]
B --> H[Complex Flows: CompletableFuture]
C --> I[Volatile for Flags]
C --> J[Synchronized for Mutual Exclusion]
C --> K[Atomic for Counters]
D --> L[Race Condition Tests]
D --> M[Deadlock Detection]
D --> N[Load Testing]
E --> O[Thread Pool Metrics]
E --> P[Contention Analysis]
E --> Q[Memory Usage]
style A fill:#ff6b6b
style C fill:#4ecdc4
style E fill:#feca57
By mastering these fundamental patterns and understanding their internal mechanisms, you can build highly efficient, scalable, and maintainable concurrent Java applications. Remember that concurrency is not just about making things run in parallel - it’s about designing systems that can safely and efficiently coordinate multiple simultaneous operations while maintaining correctness and performance.
The patterns and techniques covered in this deep dive form the foundation for more advanced concurrency concepts like reactive programming, actor models, and distributed computing frameworks. Master these basics first, and you’ll be well-equipped to tackle any concurrent programming challenge in Java.