1 线程池的元素 线程池主要由两个概念组成,一个是任务队列,另一个是工作者线程。
2 为什么要用线程池
3 线程池中的核心概念
4 线程池的原理定义 : 线程池通过一个叫 ctl 的 AtomicInteger 决定运行情况 , 通过 ThreadFactory 创建线程 , 并且把等待的线程放入 workQueue , 等待移交给工作线程
// 基本对象
ThreadPoolExecutor
// 可重用固定线程数的线程池
FixedThreadPool
- ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
// 使用单个worker线程的Executor
SingleThreadExecutor
// 会根据需要创建新线程的线程池
CachedThreadPool
线程池创建可以通过 ThreadPoolExecutor 和 工具类 Executors 实现
通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则
3.2.1 FixedThreadPool
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
3.2.2 SingleThreadExecutor
return new FinalizableDelegatedExecutorService (
new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
3.2.3 CachedThreadPool:
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
3.2.4 ScheduledExecutorService :
return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
1 > Fork / Join 的核心是 ForkJoinPool , 用于来管理工作线程
: 工作线程一次只能执行一个任务 ,
: 不会根据任务创建线程,而是将任务存储到工作线程的双端队列中
2 > Fork / join 的思路是分而治之
- Fork 递归的将任务分为较小的子任务
- Join : 将子任务递归的串联成单个结果
3 > 工作窃取算法 : 空闲的线程试图从繁忙的线程(他们的双端队列)中窃取工作
// Fork/Join 依赖于 ForkJoinPool , 此处仅简单介绍 , 详情参考十六章
ThreadPoolExecutor实现了生产者/消费者模式,
- 工作者线程就是消费者
- 任务提交者就是生产者,线程池自己维护任务队列。
> ThreadPoolExecutor
- AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
: 此变量 记录了 “线程池中的任务数量”和“线程池的状态”两个信息
: 高3位表示"线程池状态",低29位表示"线程池中的任务数量"
- RUNNING : 111 : 该线程池能接收新任务 ,且能对新任务进行处理
- SHUTDOWN : 000 : 不能接收新任务 ,但是可以对任务进行处理
- STOP : 001 : 不添加新任务 , 不对任务进行处理 , 会中断正在执行的任务
- TIDYING : 010 : 当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态
- 当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态
- TERMINATED : 011 : 线程池彻底终止的状态
----------------------------------------
> ThreadPoolExecutor 的参数
- corePoolSize : 线程池中核心线程的数量
- maximumPoolSize : 线程池中允许的最大线程数
- keepAliveTime : 线程空闲的时间
- unit : keepAliveTime的单位
- workQueue : 用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口
- threadFactory : 用于设置创建线程的工厂
- allowCoreThreadTimeOut : 允许核心线程过期
- Handler : 处理器
- defaultHandler : 任务拒绝处理器
// 线程池的饱和策略 , 当线程池满了. 会通过对应的策略
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
// ThreadPoolExecutor 提供了动态调整线程池容量大小的方法:
• setCorePoolSize:设置核心池大小。
• setMaximumPoolSize:设置线程池最大能创建的线程数目大小。
当上述参数从小变大时,ThreadPoolExecutor 进行线程赋值,还可能立即创建新的线程来执行任务。
// 动态调整源码核心 :
刚创建时,里面没有线程调用 execute() 方法,添加任务时:
两个方法都可以向线程池提交任务。
重点在于线程池的队列是有界还是无界的。
> 如果你使用的 LinkedBlockingQueue,也就是无界队列的话,没关系,继续添加任务到阻塞队列中等待执行,因为 LinkedBlockingQueue 可以近乎认为是一个无穷大的队列,可以无限存放任务。
> 如果你使用的是有界队列比方说 ArrayBlockingQueue 的话,任务首先会被添加到 ArrayBlockingQueue 中,ArrayBlockingQueue满了,则会使用拒绝策略 RejectedExecutionHandler 处理满了的任务,默认是 AbortPolicy 。
要想弄清楚这一部分 , 首先得理解 Queue , Worker , Task , Thread 等多个概念
Worker 对象
Worker 对象是 ThreadPoolExecutor 中的一个内部类 , 他是一个包装类 , 是一个线程单元 , 同时提供线程的中断等功能
// 问题一 : Worker 结构
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 被封装的线程
final Thread thread;
// 初始任务
Runnable firstTask;
// 线程任务计数器
volatile long completedTasks;
// 可以看到 , 把 worker 都行构建成了 Thread
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {....}
// 获取同步状态
protected boolean tryAcquire(int unused) {....}
// 释放同步状态
protected boolean tryRelease(int unused) {....}
//锁操作
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 暂停操作
void interruptIfStarted(){....}
}
// 问题二 : ThreadPoolExecutor中的线程包装
- 线程被封装成一个对象Worker
- 通过调用 runWorker(Worker w) 获取任务并执行的死循环
- 如果任务的运行出了什么问题 ,调用 processWorkerExit() 处理
C- ThreadPoolExecutor
PVC- Worker
M- run : public void run() {runWorker(this); }
拒绝策略部分 Policy
ThreadPoolExecutor 中提供了4 个拒绝策略内部类 , 具体的类型详见上文 , 这里来看一下结构 :
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Step 1 : 进入的起点 - 线程的封装
// task 的构建 : 匿名传进来的线程会构建成一个 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
Step 2 : 运行的起点 - execute
ThreadPoolExecutor 中 excutor 方法是执行的起点 , 其中会进行三种操作
// 问题一 : execute 中线程池处理任务的逻辑
1 int c = ctl.get();
2 if (workerCountOf(c) < corePoolSize) {
3 if (addWorker(command, true))
4 return;
5 c = ctl.get();
6 }
7 if (isRunning(c) && workQueue.offer(command)) {
8 int recheck = ctl.get();
9 if (! isRunning(recheck) && remove(command))
10 reject(command);
11 else if (workerCountOf(recheck) == 0)
12 addWorker(null, false);
13 }
14 else if (!addWorker(command, false))
15 reject(command);
// 2 : workerCountOf 判断当前线程数是否小于corePoolSize 从而决定是否通过 addWorker 创建线程
// 7 : 如果线程池已满 ,且状态为 running , 尝试把任务添加到 workQueue
// 14 : 如果 7 步处理失败 , 尝试 addWorker , 失败则通过 reject 处理
//补充 : addWork 作用
- 检查是否可以根据当前池状态和给定边界(核心或最大)添加新的工作者
- 创建并启动新的worker,运行firstTask作为它的第一个任务
// 问题二 : 线程池运行 Work 详情 (简述一下就是核心的四步)
1 addWorker(Runnable firstTask, boolean core) : 可以看到addWorker 添加的是一个 Runnable
2 new Worker(firstTask) :如果状态符合 ,会创建一个 Worker 对象
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
3 final Thread t = w.thread;
?- 这里将Thread 取了出来
4 后文将会 t.start()运行
// 补充 : 期间还会进行锁的处理 , 省略一些的主要流程如下
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 核心一 : 状态判断
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 核心二 : 容量满了的处理 , 退出或者重试 (可以看到 c 语言的影子)
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 核心三 : 处理开始
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 核心四 : 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Step 3 : 工厂的创建
// 问题四 : 创建工厂
从上文说到 , 线程池通过 ThreadFactory 创建线程 (newThread()) ,
Step 4 : 线程的复用
在上文 Step 1 问题一 中 , 将 线程加入到 workQueue 中了isRunning(c) && workQueue.offer(command)
, 这里就是取出来的步骤 :
// 这个问题涉及到的方法主要包括 getTask ()
M- runWorker
while (task != null || (task = getTask()) != null) : 死循环 , 只要还有 task 就会执行
- task.run() : 获取到 task 后 通过 task run 执行
M- getTask()
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 补充 : 这里可以看到 , worker 可以理解为一个工作线程 ,他通过 while 不停的从 queue 中获取 task 执行
// 这里很有趣 , worker 更像一个加工工厂 , 我一开始还以为迭代的是 worker , 现在发现是在 worker 上锁后在里面迭代
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task 实际上不是 worker 的内部属性
while (task != null || (task = getTask()) != null) {
// 上锁
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
// 线程执行
task.run();
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Step 5 : 拒绝策略
// 瞅一瞅拒绝策略 :
当你的线程池满了后 , 通常这个异常就爆出来了
java.util.concurrent.RejectedExecutionException:
Task java.util.concurrent.FutureTask@523424b5 rejected from
java.util.concurrent.ThreadPoolExecutor@319dead1
[Running, pool size = 6, active threads = 6, queued tasks = 3, completed tasks = 0]
- 尽管我们可以通过拒绝策略有很多种 ,但是超高并发的时候哪一种都不靠谱 , 所以我们先看下 , 这个拒绝策略怎么来的
1 从问题三代码的第七行我们就能看到 workQueue.offer(command) , queue 已经 offer 失败了 , 说明Queue 也满了
2 到14行 , 再次通过 addWork 直接运行 , 失败了
3 执行了 reject 方法 , handler.rejectedExecution(command, this);
?- handler 是接口 ,他有四个实现类 , 具体含义可以见上文拒绝策略
4 例如 AbortPolicy 就是 throw new RejectedExecutionException , CallerRunsPolicy 就是再次run
(主线程慢慢跑 , 肯定慢的)
- 所以部分业务我们要改 , 怎么改 ?
1 spring 里面可以自定义你的拒绝策略 , 可以参考这一篇的用法
@ https://blog.csdn.net/tanglei6636/article/details/90721801
2 ThreadPoolExecutor 构造器里面也有
- 改的思路 ?
前提一是集群已经无法解决 (基本上现阶段集群都能满足) ,且你无法节流
1 放到消息队列
2 入库
3 写盘
4 放集合 , 单独一个线程 , 用一个取一个
Step 6 : 线程的关闭
1 checkShutdownAccess 校验是否可以关闭
2 RunState 改为 STOP
3 ReentrantLock and isInterrupted
4 drainQueue : remove queue 队列
Step 7 : 如何实现回调 ?
submit 回调
- <T> Future<T> submit(Callable<T> task)
?- 很明显 , submit 返回的是 Future , 这就意味着主线程能阻塞等待
- RunnableFuture<T> ftask = newTaskFor(task);
ctl 到底怎么玩的 ?
> private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
?- 线程池里面通过 ctl 来判断线程的状态 , 前面说了线程池高3位表示 "线程池状态",低29位表示线程池中的任务数量-
?- 以 STOP 状态为例 , 在运行的时候 ,他的十进制值为 536870912
- 首先 ,我们将他转换为二进制 -> 10000 00000 00000 00000 00000 00000
- 获取后面的29位 ,然后前面补齐 , 最后的高三位即为 001
?- 而 TIDYING 对应的就是 00001 00000 00000 00000 00000 00000 00000 -> 010
?- RUNNING 为 -1 , 按照为数不多的一点残留知识 , 这里说成111是因为负数按照补码表示的原因
?- 众所周知 , 二进制处理的效率最高 ,所以这么玩合情合理
线程池公式
> 计算密集型 :Ncpu + 1
> 包含了 I/O和其他阻塞操作的任务 : Nthreads = Ncpu x Ucpu x (1 + W/C)
Ncpu = CPU的数量
Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1
W/C = 等待时间与计算时间的比率
> IO密集型 = 2Ncpu
比较当前线程容量的方法 workerCountOf(c) 为什么要 & 一个 CAPACITY ?
1 > public static final int SIZE = 32;
?- 用二进制补码形式表示int值的位数
2 > private static final int COUNT_BITS = Integer.SIZE - 3;
3 > private static final int CAPACITY = (1 << COUNT_BITS) - 1;
4 > private static int workerCountOf(int c) { return c & CAPACITY; }
// 原因一 : 还是状态的原因 , 低 29 位才是 线程数量 , 加上这个参数才能包装低 29 二进制时最开始为 0
// 构造一个线程池 , 推荐用法
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3));
// 除了测试 , 尽量避免使用以下方法构建线程池
// 线程创建
// 1 CachedThreadPool
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {}
// 2 FixedThreadPool
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {}
// 3 SingleThreadExecutor
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {}
// 4 SingleThreadScheduledExecutor
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleWithFixedDelay(() -> {}
// 线程池的关闭
executor.shutdown();
executor.shutdownNow();
// 信息获取
executor.isTerminated() : 是否关闭
executor.getPoolSize()
executor.getQueue().size()
// 使用线程池时有一些规约和建议是需要注意的 :
- 创建线程或线程池时请指定有意义的线程名称
- 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式
- 并且不建议创建无界线程 , 避免 OOM
- 必须回收自定义的 ThreadLocal 变量,尤其在线程池场景下
// 注意点 :
- 注意线程池的拒绝策略 , 当线程池满了时 , 可能会因为策略带来系统崩溃
- newCachedThreadPool 也有可能出现 OOM , 其最大值为 newCachedThreadPool
-
阅读量:2018
点赞量:0
收藏量:0