Java 多线程 : 真想聊清楚线程池-灵析社区

带鱼

一 .   线程池简介

1 线程池的元素 线程池主要由两个概念组成,一个是任务队列,另一个是工作者线程。
  • 任务队列是一个阻塞队列,保存待执行的任务。
  • 工作者线程主体就是一个循环,循环从队列中接受任务并执行。
2 为什么要用线程池
  • 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
3 线程池中的核心概念
  • BlockingQueue workQueue : 用于保留任务并移交给工作线程的队列
  • HashSet workers : 线程池中所有的工作线程
4 线程池的原理定义 : 线程池通过一个叫 ctl 的 AtomicInteger 决定运行情况 , 通过 ThreadFactory 创建线程 , 并且把等待的线程放入 workQueue , 等待移交给工作线程

二.  常见的线程池

// 基本对象
ThreadPoolExecutor 

// 可重用固定线程数的线程池
FixedThreadPool 
	- ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
	
// 使用单个worker线程的Executor
SingleThreadExecutor 

// 会根据需要创建新线程的线程池
CachedThreadPool 


三. 线程池的创建

线程池创建可以通过 ThreadPoolExecutor 和 工具类 Executors 实现

3.1 通过构造方法实现(推荐)

通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则

3.2 通过Executor 框架的工具类Executors来实现 (个人demo 可以考虑)

3.2.1 FixedThreadPool
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
  • 该方法返回一个固定线程数量的线程池。 (corePoolSize == maximumPoolSize)
  • 使用LinkedBlockingQuene作为阻塞队列
  • 当线程池没有可执行任务时,也不会释放线程
  • 该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
3.2.2 SingleThreadExecutor
return new FinalizableDelegatedExecutorService (
new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));

  • 方法返回一个只有一个线程的线程池。
  • 若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
3.2.3 CachedThreadPool:
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
  • 该方法返回一个可根据实际情况调整线程数量的线程池。(默认缓存60s , 线程池的线程数可达到Integer.MAX_VALUE,即2147483647)
  • 内部使用SynchronousQueue作为阻塞队列
  • 线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。
  • 所有线程在当前任务执行完毕后,将返回线程池进行复用。
  • 在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源
3.2.4 ScheduledExecutorService :
return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); 
  • 初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据

四. Fork/Join

1 >  Fork / Join 的核心是 ForkJoinPool , 用于来管理工作线程 
	: 工作线程一次只能执行一个任务 ,
	: 不会根据任务创建线程,而是将任务存储到工作线程的双端队列中
2 > Fork / join 的思路是分而治之
    - Fork 递归的将任务分为较小的子任务
    - Join : 将子任务递归的串联成单个结果

3 > 工作窃取算法 : 空闲的线程试图从繁忙的线程(他们的双端队列)中窃取工作
    
// Fork/Join 依赖于 ForkJoinPool , 此处仅简单介绍 , 详情参考十六章

五. ThreadPoolExecutor

ThreadPoolExecutor实现了生产者/消费者模式,
    - 工作者线程就是消费者
    - 任务提交者就是生产者,线程池自己维护任务队列。

> ThreadPoolExecutor 
	- AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	: 此变量 记录了 “线程池中的任务数量”和“线程池的状态”两个信息
	: 高3位表示"线程池状态",低29位表示"线程池中的任务数量"
	  - RUNNING : 111 : 该线程池能接收新任务 ,且能对新任务进行处理
	  - SHUTDOWN : 000 : 不能接收新任务 ,但是可以对任务进行处理
	  - STOP : 001 : 不添加新任务 , 不对任务进行处理 , 会中断正在执行的任务
	  - TIDYING : 010 : 当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态
	  		- 当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态
	  - TERMINATED : 011 : 线程池彻底终止的状态
	
----------------------------------------

> ThreadPoolExecutor 的参数
	- corePoolSize : 线程池中核心线程的数量
	- maximumPoolSize : 线程池中允许的最大线程数
	- keepAliveTime : 线程空闲的时间
	- unit : keepAliveTime的单位
	- workQueue : 用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口
	- threadFactory : 用于设置创建线程的工厂
	- allowCoreThreadTimeOut : 允许核心线程过期
	- Handler : 处理器
            - defaultHandler : 任务拒绝处理器
	

六 .  线程池的饱和和动态调整

// 线程池的饱和策略 , 当线程池满了.  会通过对应的策略
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;


// ThreadPoolExecutor 提供了动态调整线程池容量大小的方法:
• setCorePoolSize:设置核心池大小。
• setMaximumPoolSize:设置线程池最大能创建的线程数目大小。
当上述参数从小变大时,ThreadPoolExecutor 进行线程赋值,还可能立即创建新的线程来执行任务。

// 动态调整源码核心 : 
    

七. 线程池执行任务的过程

刚创建时,里面没有线程调用 execute() 方法,添加任务时:

  • 完成一个任务,继续取下一个任务处理。 没有任务继续处理,线程被中断或者线程池被关闭时,线程退出执行,如果线程池被关闭,线程结束。 否则,判断线程池正在运行的线程数量是否大于核心线程数,如果是,线程结束,否则线程阻塞。因此线程池任务全部执行完成后,继续留存的线程池大小为 corePoolSize 。

八. 线程池中 submit 和 execute 方法有什么区别

两个方法都可以向线程池提交任务。

  • #execute(...) 方法,返回类型是 void ,它定义在 Executor 接口中 , 必须实现Runnable接口 。
  • #submit(...) 方法,可以返回持有计算结果的 Future 对象,它定义在 ExecutorService 接口中,它扩展了 Executor 接口,其它线程池类像 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 都有这些方法。

九 .  如果你提交任务时,线程池队列已满,这时会发生什么

重点在于线程池的队列是有界还是无界的

> 如果你使用的 LinkedBlockingQueue,也就是无界队列的话,没关系,继续添加任务到阻塞队列中等待执行,因为 LinkedBlockingQueue 可以近乎认为是一个无穷大的队列,可以无限存放任务。

> 如果你使用的是有界队列比方说 ArrayBlockingQueue 的话,任务首先会被添加到 ArrayBlockingQueue 中,ArrayBlockingQueue满了,则会使用拒绝策略 RejectedExecutionHandler 处理满了的任务,默认是 AbortPolicy 。

十 .  线程池的底层逻辑

要想弄清楚这一部分 , 首先得理解 Queue , Worker , Task , Thread 等多个概念

  • Queue :
  • Worker :
  • Task :
  • Thread :

10.1 线程池的物理结构

Worker 对象

Worker 对象是 ThreadPoolExecutor 中的一个内部类 , 他是一个包装类 , 是一个线程单元 , 同时提供线程的中断等功能

// 问题一 : Worker 结构
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    
    // 被封装的线程
    final Thread thread;
    // 初始任务
    Runnable firstTask;
    // 线程任务计数器
    volatile long completedTasks;

    // 可以看到 , 把 worker 都行构建成了 Thread
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

  
    public void run() {
        runWorker(this);
    }

    protected boolean isHeldExclusively() {....}
    // 获取同步状态
    protected boolean tryAcquire(int unused) {....}
    // 释放同步状态
    protected boolean tryRelease(int unused) {....}
    
    //锁操作
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    // 暂停操作
    void interruptIfStarted(){....}
}

// 问题二 : ThreadPoolExecutor中的线程包装
- 线程被封装成一个对象Worker
- 通过调用 runWorker(Worker w) 获取任务并执行的死循环 
- 如果任务的运行出了什么问题 ,调用 processWorkerExit() 处理
    
C- ThreadPoolExecutor
    PVC- Worker
    	M- run : public void run() {runWorker(this); }
拒绝策略部分 Policy

ThreadPoolExecutor 中提供了4 个拒绝策略内部类  , 具体的类型详见上文 , 这里来看一下结构 :

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

10.2 主要流程

Step 1 : 进入的起点 - 线程的封装
// task 的构建 : 匿名传进来的线程会构建成一个 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
Step 2 : 运行的起点 - execute

ThreadPoolExecutor 中 excutor 方法是执行的起点 , 其中会进行三种操作

  1. 当线程池未满时 , 直接 addWorker 运行
  2. 当线程池满了且正在运行时 , 将线程加入 workQueue 中
  3. 当上述均失败后 , 就会调用 reject 来处理异常情况 (RejectedExecutionHandler)
// 问题一 : execute 中线程池处理任务的逻辑
1	        int c = ctl.get();
2	        if (workerCountOf(c) < corePoolSize) {
3	            if (addWorker(command, true))
4	                return;
5	            c = ctl.get();
6	        }
7	        if (isRunning(c) && workQueue.offer(command)) {
8	            int recheck = ctl.get();
9	            if (! isRunning(recheck) && remove(command))
10	                reject(command);
11	            else if (workerCountOf(recheck) == 0)
12	                addWorker(null, false);
13	        }
14	        else if (!addWorker(command, false))
15	            reject(command);
     
// 2 : workerCountOf 判断当前线程数是否小于corePoolSize 从而决定是否通过 addWorker 创建线程
// 7 : 如果线程池已满 ,且状态为 running , 尝试把任务添加到 workQueue
// 14 : 如果 7 步处理失败 , 尝试 addWorker , 失败则通过 reject 处理  


//补充 : addWork 作用
- 检查是否可以根据当前池状态和给定边界(核心或最大)添加新的工作者 
- 创建并启动新的worker,运行firstTask作为它的第一个任务  
// 问题二 : 线程池运行 Work 详情 (简述一下就是核心的四步)
1 addWorker(Runnable firstTask, boolean core)  : 可以看到addWorker 添加的是一个 Runnable
2 new Worker(firstTask) :如果状态符合 ,会创建一个 Worker 对象
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
} 
3 final Thread t = w.thread;
    ?- 这里将Thread 取了出来
    
4 后文将会 t.start()运行  

// 补充 : 期间还会进行锁的处理 , 省略一些的主要流程如下
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 核心一 : 状态判断
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        
        // 核心二 : 容量满了的处理 , 退出或者重试 (可以看到 c 语言的影子)
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 核心三 : 处理开始
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 核心四 : 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Step 3 : 工厂的创建
// 问题四 : 创建工厂
从上文说到 , 线程池通过 ThreadFactory 创建线程 (newThread()) , 

Step 4 : 线程的复用

在上文 Step 1 问题一 中 , 将 线程加入到 workQueue 中了isRunning(c) && workQueue.offer(command) , 这里就是取出来的步骤 :

// 这个问题涉及到的方法主要包括 getTask () 
M- runWorker
     while (task != null || (task = getTask()) != null) : 死循环 , 只要还有 task 就会执行
         - task.run() : 获取到 task 后 通过 task run 执行
         
M- getTask()
     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
     workQueue.take();
     
// 补充 : 这里可以看到 , worker 可以理解为一个工作线程 ,他通过 while 不停的从 queue 中获取 task 执行

// 这里很有趣 , worker 更像一个加工工厂 , 我一开始还以为迭代的是 worker , 现在发现是在 worker 上锁后在里面迭代
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task 实际上不是 worker 的内部属性
        while (task != null || (task = getTask()) != null) {
            // 上锁
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                // 线程执行
                task.run();
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Step 5 : 拒绝策略
// 瞅一瞅拒绝策略 : 
当你的线程池满了后 , 通常这个异常就爆出来了
java.util.concurrent.RejectedExecutionException:
    Task java.util.concurrent.FutureTask@523424b5 rejected from
    java.util.concurrent.ThreadPoolExecutor@319dead1
    [Running, pool size = 6, active threads = 6, queued tasks = 3, completed tasks = 0]

- 尽管我们可以通过拒绝策略有很多种 ,但是超高并发的时候哪一种都不靠谱 , 所以我们先看下 , 这个拒绝策略怎么来的
    1 从问题三代码的第七行我们就能看到 workQueue.offer(command) , queue 已经 offer 失败了 , 说明Queue 也满了
    2 到14行 , 再次通过 addWork 直接运行 , 失败了
    3 执行了 reject 方法 , handler.rejectedExecution(command, this);
            ?- handler 是接口 ,他有四个实现类 , 具体含义可以见上文拒绝策略 
    4 例如 AbortPolicy 就是 throw new RejectedExecutionException , CallerRunsPolicy 就是再次run 
    (主线程慢慢跑 , 肯定慢的)

- 所以部分业务我们要改 , 怎么改 ? 
    1 spring 里面可以自定义你的拒绝策略 , 可以参考这一篇的用法
        @ https://blog.csdn.net/tanglei6636/article/details/90721801
    2 ThreadPoolExecutor 构造器里面也有
    
- 改的思路 ?
    前提一是集群已经无法解决 (基本上现阶段集群都能满足) ,且你无法节流
    1 放到消息队列
    2 入库 
    3 写盘
    4 放集合 , 单独一个线程 , 用一个取一个 
    
Step 6 : 线程的关闭
1 checkShutdownAccess 校验是否可以关闭
2 RunState 改为 STOP
3 ReentrantLock and isInterrupted
4 drainQueue : remove queue 队列    
Step 7 : 如何实现回调 ?
submit 回调
- <T> Future<T> submit(Callable<T> task)
        ?- 很明显 , submit 返回的是 Future , 这就意味着主线程能阻塞等待
        - RunnableFuture<T> ftask = newTaskFor(task);

10.2 底层复杂分析

ctl 到底怎么玩的 ?
> private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    ?- 线程池里面通过 ctl 来判断线程的状态 , 前面说了线程池高3位表示 "线程池状态",低29位表示线程池中的任务数量-
    ?- 以 STOP 状态为例 , 在运行的时候 ,他的十进制值为 536870912
        - 首先 ,我们将他转换为二进制 -> 10000 00000 00000 00000 00000 00000
        - 获取后面的29位 ,然后前面补齐 , 最后的高三位即为 001
    ?- 而 TIDYING 对应的就是 00001 00000 00000 00000 00000 00000 00000 -> 010
    ?- RUNNING 为 -1 , 按照为数不多的一点残留知识 , 这里说成111是因为负数按照补码表示的原因
    ?- 众所周知 , 二进制处理的效率最高 ,所以这么玩合情合理
线程池公式
> 计算密集型 :Ncpu + 1
    
> 包含了 I/O和其他阻塞操作的任务 : Nthreads = Ncpu x Ucpu x (1 + W/C)
  Ncpu = CPU的数量
  Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1
  W/C = 等待时间与计算时间的比率
    
> IO密集型 = 2Ncpu
比较当前线程容量的方法 workerCountOf(c) 为什么要 & 一个 CAPACITY ?
1 > public static final int SIZE = 32;
	?- 用二进制补码形式表示int值的位数
2 >  private static final int COUNT_BITS = Integer.SIZE - 3;
3 >  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
4 >  private static int workerCountOf(int c)  { return c & CAPACITY; }

// 原因一 : 还是状态的原因 , 低 29 位才是 线程数量 , 加上这个参数才能包装低 29 二进制时最开始为 0 

十一.  线程池使用


// 构造一个线程池 , 推荐用法 
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3));


// 除了测试 , 尽量避免使用以下方法构建线程池
// 线程创建
// 1 CachedThreadPool
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {}
  
// 2 FixedThreadPool
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {}

// 3 SingleThreadExecutor
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {}

// 4 SingleThreadScheduledExecutor
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleWithFixedDelay(() -> {}
                                
                                
// 线程池的关闭
executor.shutdown();
executor.shutdownNow();
                                
// 信息获取
executor.isTerminated() : 是否关闭
executor.getPoolSize()   
executor.getQueue().size()         
                                

十二 .  线程池的想法

// 使用线程池时有一些规约和建议是需要注意的 : 
	- 创建线程或线程池时请指定有意义的线程名称 
	- 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式
        	- 并且不建议创建无界线程 , 避免 OOM
	- 必须回收自定义的 ThreadLocal 变量,尤其在线程池场景下

// 注意点 : 
	- 注意线程池的拒绝策略 , 当线程池满了时 , 可能会因为策略带来系统崩溃
	- newCachedThreadPool 也有可能出现 OOM , 其最大值为 newCachedThreadPool 
	-  


阅读量:2018

点赞量:0

收藏量:0