[Java Concurrent Programming] summary of common tools: countdownlatch, cyclicbarrier, semphore, exchange
CountDownLatch
Countdownlatch allows one or more threads to wait for other threads to complete operations. Similar to the operation of join, analogy can be made:
public class JoinCDLT {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
System.out.println("parser1 start");
Thread.sleep(5000);
System.out.println("parser1 finish");
}
});
Thread parser2 = new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
System.out.println("parser2 start");
Thread.sleep(10000);
System.out.println("parser2 finish");
}
});
long start = System.currentTimeMillis();
parser1.start();
parser2.start();
//join用于让当前执行线程等待join线程执行结束
parser1.join();
parser2.join();
long end = System.currentTimeMillis();
System.out.println("all parser finish,spend " + (end - start) + " ms");
}
}
public class CountDownLatchTest {
//传入int参数作为计数器
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
Thread a = new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(3000);
System.out.println(1);
c.countDown();//每当调用该方法,计数器N - 1
}
});
Thread b = new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(3000);
System.out.println(2);
c.countDown();
}
});
long start = System.currentTimeMillis();
a.start();
b.start();
//await方法会阻塞当前线程
c.await();
long end = System.currentTimeMillis();
System.out.println(3 + " " + (end - start));
}
}
The counter must be greater than or equal to 0, but when it is equal to 0, the counter is 0. When await method is called, the current thread will not be blocked. Countdownlatch cannot reinitialize or modify the value of the internal counter of the countdownlatch object. One thread calls the countdown method happen before, and the other thread calls the await method.
CyclicBarrier
Cyclicbarrier allows a group of threads to be blocked when they reach a barrier [synchronization point]. The barrier will not open until the last thread reaches the barrier, and all threads intercepted by the barrier will continue to run.
public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
//线程调用await,告诉CyclicBarrier已经到达了屏障,然后当前线程被阻塞
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(1);
}
},"thread-1");
thread.start();
try {
//主线程到达了屏障,因为设置了parties设置为2,因此可以继续下去
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(2);
}
}
The output result of the above program may be in the order of first 1 and then 2, or first 2 and then 1. The reason is that the scheduling of the main thread and sub thread is determined by the CPU, and both threads may be executed first.
However, if the number of barriers is changed to 3, the main thread and sub thread will wait forever, because no third thread has reached the barrier.
The difference between cyclicbarrier and countdownlatch
Implementation of phaser
Phaser can replace countdownlatch and cyclicbarrier, but it is more powerful than both. It can dynamically adjust the number of threads required. It can pass in the parent phaser through the constructor to realize the hierarchical phaser.
reference resources: https://segmentfault.com/a/1190000015979879
Semaphore
Semaphore can be used to control the number of threads accessing specific resources at the same time. It coordinates each thread to ensure the rational use of public resources. The applicable scenario can be traffic control.
The AQS implementation is used. The AQS state variable state is used as the number of licenses. Each time through acquire() / tryacquire(), the number of licenses decreases through CAS atomicity, and the release() is called to release the license. The atomicity increases. As long as there is a license, it can be reused.
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);//10个许可证数量,最大并发数为10
public static void main(String[] args) {
for(int i = 0; i < THREAD_COUNT; i ++){ //执行30个线程
threadPool.execute(new Runnable() {
@Override
public void run() {
s.tryAcquire(); //尝试获取一个许可证
System.out.println("save data");
s.release(); //使用完之后归还许可证
}
});
}
threadPool.shutdown();
}
}
Exchange principle
It is used for data exchange between threads. It provides a synchronization point where two threads can exchange data with each other.
If the first thread executes the exchange () method first, it will wait until the second thread also executes the exchange method. When both threads reach the synchronization point, the two threads can exchange data.
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@SneakyThrows
@Override
public void run() {
String A = "银行流水A";
exgr.exchange(A);
}
});
threadPool.execute(new Runnable() {
@SneakyThrows
@Override
public void run() {
String B = "银行流水B";
String A = exgr.exchange(B);
System.out.println("A 和 B 的数据是否一致 :"
+ A.equals(B) + " A录入的数据为 :" + A + " B录入的数据为 :" + B);
}
});
threadPool.shutdown();
}
}
reference resources