JUC-中-灵析社区

菜鸟码转

三、并发工具类1:CountDownLatch、CyclicBarrier、Semaphor

CountDownLatch、 CyclicBarrier 与 Semaphore 都是 Java 的并发工具,其中 CountDownLatch 是减法计数器,CyclicBarrier 是循环栅栏,现在举例如下。

并发工具类一:CountDownLatch

CountDownLatch 用于需要等待其它业务线程执行完成以后,主线程才能接着执行的场景。一般的用法是:

  • 初始化的时候指定一个整数(计数器个数);
  • 主线程执行 latch.await(); 方法,等待其它线程执行完毕,其它线程必须要调用 latch.countDown(); 使得计数器个数减 1;
  • 直到计数器个数的值为 0 的时候,主线程的 latch.await(); 方法后面的代码才能继续执行。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CountDownLatchDemo {

    public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(5);
        ExecutorService executorService = new ThreadPoolExecutor(6, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());

        for (int i = 0; i < 6; i++) {
            final int num = i + 1;
            Runnable runnable = () -> {
                try {
                    Thread.sleep(num * 1000);
                    System.out.println("运动员 " + num + " 到达了终点。");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    latch.countDown();
                }
            };
            executorService.submit(runnable);
        }

        try {
            System.out.println("裁判员一声令下:各就位!");
            latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("主线程结束。");

        executorService.shutdown();
    }
}

执行结果:

裁判员一声令下:各就位!
运动员 1 到达了终点。
运动员 2 到达了终点。
运动员 3 到达了终点。
运动员 4 到达了终点。
运动员 5 到达了终点。
主线程结束。
运动员 6 到达了终点。

区别于 CyclicBarrier ,CountDownLatch 只能使用一次。

并发工具类二:CyclicBarrier

CyclicBarrier 是 Java 并发包中的一个同步工具类,它允许一组线程互相等待,直到所有线程都到达某个屏障点,然后再一起继续执行。它的构造方法接收一个整数作为参数,表示需要等待的线程数量。

当一个线程调用 CyclicBarrier 的 await() 方法时,它会被阻塞,直到所有线程都调用了该方法。一旦所有线程都到达屏障点,所有线程都会被释放,同时 CyclicBarrier 的计数器会被重置,可以被重复使用。

CyclicBarrier 的核心逻辑是 barrierAction,即当所有线程到达屏障点时,要执行的任务。barrierAction 是一个可选参数,在 CyclicBarrier 的构造方法中可以指定。如果指定了 barrierAction,那么当所有线程都到达屏障点时,会先执行 barrierAction 中的任务,然后再释放所有线程。如果没有指定 barrierAction,那么所有线程到达屏障点后直接释放。

barrierAction 可以用来执行一些在所有线程都到达屏障点之前必须完成的操作,比如检查所有线程的运行结果并进行汇总。如果 barrierAction 中的任务抛出异常,那么所有线程都会立即中断并抛出 BrokenBarrierException 异常。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo {

    private static final int MIN_CLASS_NUMBER = 5;

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(MIN_CLASS_NUMBER, () -> System.out.println("人数到达开班要求,开始排课。"));

    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(6, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
        for (int i = 0; i < MIN_CLASS_NUMBER; i++) {
            executorService.execute(new LearnFromLeetCode(cyclicBarrier, "同学 " + i));
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = MIN_CLASS_NUMBER; i < MIN_CLASS_NUMBER * 3; i++) {
            executorService.execute(new LearnFromLeetCode(cyclicBarrier, "同学 " + i));
        }
        executorService.shutdown();
    }

    public static class LearnFromLeetCode implements Runnable {

        private CyclicBarrier cyclicBarrier;

        private String name;

        public LearnFromLeetCode(CyclicBarrier cyclicBarrier, String name) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " 报名");
                Thread.sleep(1000);
                cyclicBarrier.await();
                System.out.println(name + " 开始上课");
                Thread.sleep(1000);
                System.out.println(name + " 毕业");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:

同学 2 报名
同学 1 报名
同学 4 报名
同学 3 报名
同学 0 报名
人数到达开班要求,开始排课。
同学 5 报名
同学 1 开始上课
同学 2 开始上课
同学 3 开始上课
同学 4 开始上课
同学 0 开始上课
同学 2 毕业
同学 1 毕业
同学 4 毕业
同学 3 毕业
同学 8 报名
同学 7 报名
同学 6 报名
同学 0 毕业
同学 9 报名
同学 10 报名
人数到达开班要求,开始排课。
同学 7 开始上课
同学 5 开始上课
同学 9 开始上课
同学 6 开始上课
同学 8 开始上课
同学 9 毕业
同学 8 毕业
同学 11 报名
同学 6 毕业
同学 7 毕业
同学 5 毕业
同学 14 报名
同学 13 报名
同学 12 报名
人数到达开班要求,开始排课。
同学 10 开始上课
同学 14 开始上课
同学 11 开始上课
同学 12 开始上课
同学 13 开始上课
同学 10 毕业
同学 11 毕业
同学 13 毕业
同学 12 毕业

并发工具类三:Semaphore

Semaphore 的作用是「限流」,用于「同一个时刻最多只能允许若干个线程同时执行」。

  • Semaphore 初始化的时候需要指定一个整数,表示「同一个时刻最多只能允许若干个线程同时执行」的线程数;
  • 每一个线程在开始执行之前,需要调用 semaphore.acquire(); 获得一个许可证;
  • 每一个线程在结束之后,需要调用 semaphore.release(); 释放一个许可证。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {

    private static Semaphore semaphore = new Semaphore(2);

    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(6, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
        for (int i = 0; i < 10; i++) {
            executorService.execute(new Shopping(semaphore, "同学 " + i));
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }

    public static class Shopping implements Runnable {

        private Semaphore semaphore;

        private String name;

        public Shopping(Semaphore semaphore, String name) {
            this.semaphore = semaphore;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(name + " 正在购物。。。");
                Thread.sleep(3000);

                System.out.println(name + " 走出商场");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    }
}

执行结果:


同学 0 正在购物。。。
同学 1 正在购物。。。
同学 1 走出商场
同学 0 走出商场
同学 6 正在购物。。。
同学 7 正在购物。。。
同学 6 走出商场
同学 7 走出商场
同学 8 正在购物。。。
同学 9 正在购物。。。
同学 8 走出商场
同学 9 走出商场
同学 2 正在购物。。。
同学 3 正在购物。。。
同学 2 走出商场
同学 3 走出商场
同学 4 正在购物。。。
同学 5 正在购物。。。
同学 5 走出商场
同学 4 走出商场

四、并发工具类2

为什么要使用线程池

我们外出就餐,我们吃的是食物,使用的是共同的饭桌、椅子。除了一次性的餐具以外,很多与我们就餐相关的物品都是和其他顾客共享的,餐厅肯定不会为某一个顾客吃一顿饭专门准备一套饭桌和椅子。从就餐这件事情,我们知道,完成一件事情的所有前期工作(开销)可能比完成这件事情本身还要大。这些前期工作是类似的,并且可以重复使用。

线程池也是类似的思想。线程池是一种常用的并发编程技术,它可以提高程序的性能和可伸缩性,适用于需要处理大量并发任务的情况。以下是使用线程池的一些好处:

  • 重复利用线程:线程池中的线程可以被重复利用,可以避免频繁地创建和销毁线程,从而提高程序的效率;
  • 提高程序的响应速度:线程池中的线程可以并行执行多个任务,可以显著减少任务的响应时间,提高程序的响应速度;
  • 控制并发:线程池可以控制并发线程的数量,可以根据实际情况动态调整线程池中线程的数量,从而避免过度创建线程导致系统资源不足。

使用线程池可以提高程序的性能、可伸缩性、响应速度和稳定性,是一种常用的并发编程技术。除了线程池之外,还有许多其他类型的池化技术,例如:数据库连接池、内存池、网络连接池等。

四个常用的线程池

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在 Executors 类 里面提供了一些静态工厂,生成一些常用的线程池。

注意:下面这些类在初学的时候、测试的时候使用,在生产环境中不要使用。

1、newSingleThreadExecutor

创建一个单线程的线程池。串行执行,其实就是排队。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

public static ExecutorService newSingleThreadExecutor() 
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

2、newFixedThreadPool

创建线程池的时候需要指定线程池里的线程数。创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

3、newCachedThreadPool

创建线程池的时候不需要指定线程池里的线程数。创建一个可缓存的线程池,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒处于等待任务到来)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池的最大值是 Integer 的最大值 (2

的31次方−1) 。

public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

4、newScheduledThreadPool

周期执行。此线程池支持定时以及周期性执行任务的需求。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

线程池的工作原理

线程池的工作原理在 java.util.concurrent.ThreadPoolExecutor 这个类的源码里有详细地介绍,如果英文看起来不够友好,可以看 这里 的中文翻译。在这里我们为大家简单归纳一下:

  • 一开始有任务来,把任务交给「核心线程 corePoolSize」工作;
  • 如果「核心线程」都在工作,就新来的任务在「阻塞队列 BlockingQueue」里等待;
  • 如果「阻塞队列」都满了,就开启「非核心线程」工作。这里「非核心线程」可以理解为临时工;
  • 如果「非核心线程」都在工作,再来新的任务就装不下了,就要采取相应的拒绝策略。

常见的面试题

1、什么是线程池?为什么要使用线程池?

线程池是一种管理和重用线程的机制,它可以减少线程的创建和销毁的开销,提高系统的性能和稳定性。

线程池的使用可以有效地避免线程频繁地创建和销毁,从而减少系统开销,提高应用程序的响应速度和并发能力。

2、线程池的工作原理是什么?

线程池的工作原理主要分为以下几步:

  • 初始化:线程池初始化时,会创建一定数量的核心线程以及一个任务队列。这些核心线程会一直存在,除非被线程池关闭或者被替换掉;
  • 任务提交:当有新任务需要执行时,将任务放入任务队列中;
  • 线程执行任务:线程池中的线程会从任务队列中取出任务并执行;
  • 线程池维护:线程池会维护线程的数量,当线程数量超过预设的上限时,新的任务会被放入任务队列中等待执行。

3、线程池的核心参数有哪些?如何设置线程池的大小?

线程池的核心参数包括以下几个:

  • 核心线程数:线程池中始终保持的线程数;
  • 最大线程数:线程池中最多允许的线程数(可以这样理解:最大线程数 - 核心线程数 = 非核心线程数,非核心线程可以看成「临时工」 ,任务队列满了的时候,才会启动非核心线程);
  • 任务队列:存放等待执行的任务的队列;
  • 线程空闲时间:线程在空闲状态下的最大存活时间;
  • 线程池拒绝策略:当任务队列已满且线程数量达到最大值时,线程池如何处理新的任务。

阅读量:2034

点赞量:0

收藏量:0