CountDownLatch、 CyclicBarrier 与 Semaphore 都是 Java 的并发工具,其中 CountDownLatch 是减法计数器,CyclicBarrier 是循环栅栏,现在举例如下。
并发工具类一:CountDownLatch
CountDownLatch 用于需要等待其它业务线程执行完成以后,主线程才能接着执行的场景。一般的用法是:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CountDownLatchDemo {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(5);
ExecutorService executorService = new ThreadPoolExecutor(6, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 6; i++) {
final int num = i + 1;
Runnable runnable = () -> {
try {
Thread.sleep(num * 1000);
System.out.println("运动员 " + num + " 到达了终点。");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
latch.countDown();
}
};
executorService.submit(runnable);
}
try {
System.out.println("裁判员一声令下:各就位!");
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("主线程结束。");
executorService.shutdown();
}
}
执行结果:
裁判员一声令下:各就位!
运动员 1 到达了终点。
运动员 2 到达了终点。
运动员 3 到达了终点。
运动员 4 到达了终点。
运动员 5 到达了终点。
主线程结束。
运动员 6 到达了终点。
区别于 CyclicBarrier ,CountDownLatch 只能使用一次。
并发工具类二:CyclicBarrier
CyclicBarrier 是 Java 并发包中的一个同步工具类,它允许一组线程互相等待,直到所有线程都到达某个屏障点,然后再一起继续执行。它的构造方法接收一个整数作为参数,表示需要等待的线程数量。
当一个线程调用 CyclicBarrier 的 await() 方法时,它会被阻塞,直到所有线程都调用了该方法。一旦所有线程都到达屏障点,所有线程都会被释放,同时 CyclicBarrier 的计数器会被重置,可以被重复使用。
CyclicBarrier 的核心逻辑是 barrierAction,即当所有线程到达屏障点时,要执行的任务。barrierAction 是一个可选参数,在 CyclicBarrier 的构造方法中可以指定。如果指定了 barrierAction,那么当所有线程都到达屏障点时,会先执行 barrierAction 中的任务,然后再释放所有线程。如果没有指定 barrierAction,那么所有线程到达屏障点后直接释放。
barrierAction 可以用来执行一些在所有线程都到达屏障点之前必须完成的操作,比如检查所有线程的运行结果并进行汇总。如果 barrierAction 中的任务抛出异常,那么所有线程都会立即中断并抛出 BrokenBarrierException 异常。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CyclicBarrierDemo {
private static final int MIN_CLASS_NUMBER = 5;
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(MIN_CLASS_NUMBER, () -> System.out.println("人数到达开班要求,开始排课。"));
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(6, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < MIN_CLASS_NUMBER; i++) {
executorService.execute(new LearnFromLeetCode(cyclicBarrier, "同学 " + i));
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = MIN_CLASS_NUMBER; i < MIN_CLASS_NUMBER * 3; i++) {
executorService.execute(new LearnFromLeetCode(cyclicBarrier, "同学 " + i));
}
executorService.shutdown();
}
public static class LearnFromLeetCode implements Runnable {
private CyclicBarrier cyclicBarrier;
private String name;
public LearnFromLeetCode(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + " 报名");
Thread.sleep(1000);
cyclicBarrier.await();
System.out.println(name + " 开始上课");
Thread.sleep(1000);
System.out.println(name + " 毕业");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
执行结果:
同学 2 报名
同学 1 报名
同学 4 报名
同学 3 报名
同学 0 报名
人数到达开班要求,开始排课。
同学 5 报名
同学 1 开始上课
同学 2 开始上课
同学 3 开始上课
同学 4 开始上课
同学 0 开始上课
同学 2 毕业
同学 1 毕业
同学 4 毕业
同学 3 毕业
同学 8 报名
同学 7 报名
同学 6 报名
同学 0 毕业
同学 9 报名
同学 10 报名
人数到达开班要求,开始排课。
同学 7 开始上课
同学 5 开始上课
同学 9 开始上课
同学 6 开始上课
同学 8 开始上课
同学 9 毕业
同学 8 毕业
同学 11 报名
同学 6 毕业
同学 7 毕业
同学 5 毕业
同学 14 报名
同学 13 报名
同学 12 报名
人数到达开班要求,开始排课。
同学 10 开始上课
同学 14 开始上课
同学 11 开始上课
同学 12 开始上课
同学 13 开始上课
同学 10 毕业
同学 11 毕业
同学 13 毕业
同学 12 毕业
并发工具类三:Semaphore
Semaphore 的作用是「限流」,用于「同一个时刻最多只能允许若干个线程同时执行」。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
private static Semaphore semaphore = new Semaphore(2);
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(6, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 10; i++) {
executorService.execute(new Shopping(semaphore, "同学 " + i));
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
public static class Shopping implements Runnable {
private Semaphore semaphore;
private String name;
public Shopping(Semaphore semaphore, String name) {
this.semaphore = semaphore;
this.name = name;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(name + " 正在购物。。。");
Thread.sleep(3000);
System.out.println(name + " 走出商场");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
}
执行结果:
同学 0 正在购物。。。
同学 1 正在购物。。。
同学 1 走出商场
同学 0 走出商场
同学 6 正在购物。。。
同学 7 正在购物。。。
同学 6 走出商场
同学 7 走出商场
同学 8 正在购物。。。
同学 9 正在购物。。。
同学 8 走出商场
同学 9 走出商场
同学 2 正在购物。。。
同学 3 正在购物。。。
同学 2 走出商场
同学 3 走出商场
同学 4 正在购物。。。
同学 5 正在购物。。。
同学 5 走出商场
同学 4 走出商场
为什么要使用线程池
我们外出就餐,我们吃的是食物,使用的是共同的饭桌、椅子。除了一次性的餐具以外,很多与我们就餐相关的物品都是和其他顾客共享的,餐厅肯定不会为某一个顾客吃一顿饭专门准备一套饭桌和椅子。从就餐这件事情,我们知道,完成一件事情的所有前期工作(开销)可能比完成这件事情本身还要大。这些前期工作是类似的,并且可以重复使用。
线程池也是类似的思想。线程池是一种常用的并发编程技术,它可以提高程序的性能和可伸缩性,适用于需要处理大量并发任务的情况。以下是使用线程池的一些好处:
使用线程池可以提高程序的性能、可伸缩性、响应速度和稳定性,是一种常用的并发编程技术。除了线程池之外,还有许多其他类型的池化技术,例如:数据库连接池、内存池、网络连接池等。
四个常用的线程池
要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在 Executors 类 里面提供了一些静态工厂,生成一些常用的线程池。
注意:下面这些类在初学的时候、测试的时候使用,在生产环境中不要使用。
1、newSingleThreadExecutor
创建一个单线程的线程池。串行执行,其实就是排队。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
2、newFixedThreadPool
创建线程池的时候需要指定线程池里的线程数。创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
3、newCachedThreadPool
创建线程池的时候不需要指定线程池里的线程数。创建一个可缓存的线程池,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒处于等待任务到来)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池的最大值是 Integer 的最大值 (2
的31次方−1) 。
public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
4、newScheduledThreadPool
周期执行。此线程池支持定时以及周期性执行任务的需求。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
线程池的工作原理
线程池的工作原理在 java.util.concurrent.ThreadPoolExecutor 这个类的源码里有详细地介绍,如果英文看起来不够友好,可以看 这里 的中文翻译。在这里我们为大家简单归纳一下:
常见的面试题
1、什么是线程池?为什么要使用线程池?
线程池是一种管理和重用线程的机制,它可以减少线程的创建和销毁的开销,提高系统的性能和稳定性。
线程池的使用可以有效地避免线程频繁地创建和销毁,从而减少系统开销,提高应用程序的响应速度和并发能力。
2、线程池的工作原理是什么?
线程池的工作原理主要分为以下几步:
3、线程池的核心参数有哪些?如何设置线程池的大小?
线程池的核心参数包括以下几个:
阅读量:2034
点赞量:0
收藏量:0