π Hi Developer! Welcome to part 2 of the Mastering Concurrency in Java series! In this blog, we'll explore some advanced multithreading topics like Reentrant Locks, Semaphores, and CountDown Latches.
We have covered the basic topics in part 1, so be sure to check them out if you haven't already. Let's dive in and explore these concepts in depth! π
π Table of Contents
π Reentrant Lock
- It is the alternative to using the
synchronized
keyword. - Re-entrant Lock as the name suggests can be re-acquired (lock) again N number of times by the same thread.
It has to call
unlock
the same number of times to completely release the lock.Only one thread can lock a lock at any given time.
π Example:
import java.util.Scanner;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Runner {
private int count = 0;
private Lock lock =new ReentrantLock();
private void increment() {
for (int i = 0; i < 10000; i++) {
count++;
}
}
public void firstThread() throws InterruptedException {
lock.lock();
try {
increment();
} finally {
lock.unlock();
}
}
public void secondThread() throws InterruptedException {
lock.lock();
try {
increment();
} finally {
lock.unlock();
}
}
public void finished() {
System.out.println("Count is: " + count);
}
}
I'll run these 2 methods from 2 different thread and check for the total count;
public class App {
public static void main(String[] args) throws Exception {
final Runner runner = new Runner();
Thread t1 = new Thread(new Runnable() {
public void run() {
try {
runner.firstThread();
} catch (InterruptedException ignored) {
}
}
});
Thread t2 = new Thread(new Runnable() {
public void run() {
try {
runner.secondThread();
} catch (InterruptedException ignored) {
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
runner.finished();
}
}
Result
Count is: 20000
βIntroduction to CountDownLatch
Sometimes we need to ensure that a certain number of tasks are completed before proceeding with other parts of our application. CountDownLatch
provides a mechanism to handle such scenarios.
Initialization: When using
CountDownLatch
, we first initialize it with a count representing the number of tasks or threads that need to be completed before the waiting threads can proceed.CountDown: As each task or thread completes its work, it calls the
countDown()
method of theCountDownLatch
. This method decrements the internal count by 1.Blocking Await: Threads that are waiting for tasks to complete call the
await()
method of theCountDownLatch
. This method blocks the thread until all the threas complete their task ie. internal count reaches zero.Signaling Completion: Once the internal count reaches zero, the waiting threads are unblocked, and they can proceed with their respective tasks or operations.
βπΌExample:
We will create 5 multiple worker threads to perform tasks concurrently, and a main thread needs to wait for all tasks to complete before continuing its execution.
import java.util.concurrent.CountDownLatch;
public class Worker implements Runnable{
private int workerNumber;
CountDownLatch count;
public Worker(CountDownLatch count, int workerNumber){
this.workerNumber= workerNumber;
this.count = count;
}
@Override
public void run() {
System.out.println("Worker " + workerNumber + " is working");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Worker " + workerNumber + " completed working");
count.countDown();
}
}
I'll call the Worker run
method from the 5 different Threads from the main
method
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class App {
public static void main (String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(new Worker(latch,i));
}
latch.await();
System.out.println("All work is completed");
executor.shutdown();
executor.awaitTermination(1 , TimeUnit.DAYS);
}
}
βπΌExplanation:
- Five instances of the
Worker
class are created, each with a unique worker number (1, 2, 3, etc.) and a reference to theCountDownLatch
. - Each worker begins its execution and simulates work by sleeping for 2 seconds.
- After completing the work, each worker prints a message indicating it has finished and decreases the latch count by 1 using
countDown()
. - The main thread waits for all workers to complete their tasks by calling
latch.await()
. - Once all workers have finished their tasks and the latch count reaches zero, the main thread proceeds.
- The main thread prints a message indicating that all workers have completed their tasks.
- Finally, the executor service is shut down.
π This is the result:
π What are Semaphores
Semaphores allow you to control how many threads can access a resource simultaneously.
It constrains access to at most N threads, to control/limit concurrent access to a shared resource.
βοΈ Example:
- Let's say we want to connect to a remote server that can only accept N number of connections at a time.
import java.util.concurrent.Semaphore;
public class Connection {
public static Connection instance = new Connection();
private int connections = 0;
Semaphore sem = new Semaphore(10, true);
private Connection() {
}
public static Connection getInstance() {
return instance;
}
public void doConnect() {
synchronized (this) {
connections++;
System.out.println("connection number " + connections);
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (this) {
connections--;
}
}
public void connect() {
try {
sem.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
doConnect();
} finally {
sem.release();
}
}
}
I'll Call the connect method from the 200 Threads from the main Method.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class App {
public static void main (String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
for(int i=0;i<200;i++){
executor.submit(new Runnable() {
@Override
public void run() {
Connection.getInstance().connect();
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);
}
}
If you see the call stack of each Thread it will be
- 1. Call the
connect
method: - 2. Acquire the Semaphore permit if available.
- 3. connection ++ ;
- 4. Make Thread sleep for 2 sec.
- 5. Release the permit.
Now Semaphore will allow up to 10 permits at a time.
π Results:
Thank you for reading this blog! I hope you found it insightful and valuable in your journey to mastering concurrency in Java. π
Feel free to share your thoughts in the comments below. I'm eager to learn and improve! π π
Top comments (0)