一句话概括AQS :
java.util.concurrent.locks.AbstractQueuedSynchronizer 抽象类,简称 AQS , 队列同步器
作用 : 用于构建锁和同步容器的同步器
原理 : 使用一个 FIFO 的队列表示排队等待锁的线程
场景 : AQS解决了实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列
AQS 使用一个int 的 status 来表示同步状态, 同步状态重要目的是用于跟踪线程是否应该阻塞 , 当它的前身释放时,一个节点被通知。否则,队列中的每个节点都充当一个特定通知样式的监视器,该监视器持有单个等待线程.
状态处理
独占锁相关方法
共享锁相关方法
独占式获取同步状态
当前线程为获取到同步状态而进入到同步队列中
如果当前线程被中断,则抛出 InterruptedException()
如果未中断 ,将尝试调用 tryAcquire , 调用失败线程将进入队列,可能会反复阻塞和解除阻塞
如果当前线程被中断,则抛出 InterruptedException()
如果当前线程在 nanos 时间内没有获取到同步状态,那么将会返回 false ,已经获取则返回 true 。
未超时未获取会一只排队 ,反复阻塞
共享式获取同步状态
释放同步状态
public class SimpleLock extends AbstractQueuedSynchronizer {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
protected boolean tryAcquire(int unused) {
logger.info("------> try tryAcquire :{} <-------", unused);
//使用compareAndSetState控制AQS中的同步变量
if (compareAndSetState(0, 1)) {
logger.info("------> cas success ");
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int unused) {
logger.info("------> try tryRelease :{} <-------", unused);
setExclusiveOwnerThread(null);
//使用setState控制AQS中的同步变量
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
}
其他关联知识点 : 1 AQS的所有子类中,要么使用了它的独占锁,要么使用了它的共享锁,不会同时使用它的两个锁。
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列 该队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
AQS定义两种资源共享方式 Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:
以下方法未重写抛出 UnsupportedOperationException
Semaphore(信号量)
功能 : 允许多个线程同时访问
synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。
CountDownLatch (倒计时器) 功能 : CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。
这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
CyclicBarrier(循环栅栏)
功能 : CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。
应用场景 : 和 CountDownLatch 类似。CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。
作用 : 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
独占式获取和释放同步状态
true : 获取成功 , 设置锁状态 , 直接返回不用线程阻塞 , 自旋直到获得同步状态成功
public final void acquire(int arg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
共享式获取和释放同步状态
// 首先调用至少一次tryacquisharered
//
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 上述调用失败 , 线程可能会进入队列反复阻塞和解除阻塞
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire 检查并更新未能获取的节点的状态。如果线程阻塞,则返回true
// parkAndCheckInterrupt : 中断线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire 的作用
查询同步队列中的等待线程情况
// 自旋处理流程:
for (;;) {
// 获取当前线程的前驱节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 当前线程的前驱节点是头结点,且同步状态成功
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取失败,线程等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
AQS 在处理中有2处值得深入的过程 : 阻塞和唤醒
// 阻塞发生在获取对应同步方法同步失败后 ,流程为 :
Step Start : 获取同步状态 -> 获取失败 -> 检查该状态 shouldParkAfterFailedAcquire(Node pred, Node node)
Step 2 : 返回true -> 当前线程应该被柱塞
Step 3 : parkAndCheckInterrupt() 阻塞线程
- 调用 LockSupport#park(Object blocker) 方法,将当前线程挂起,此时就进入阻塞等待唤醒的状态
// 后续将进行线程的唤醒操作 , 唤醒分为2种
第一种,当前节点(线程)的前序节点释放同步状态时,唤醒了该线程
第二种,当前线程被打断导致唤醒。
Step Start : 当线程释放同步状态后 , 唤醒该线程的后继节点 (unparkSuccessor)
Step 2 : 后继节点存在 , 唤醒后继节点 LockSupport.unpark(s.thread)
Step 3 : 如果后继节点为null (超时 , 中断) , 通过 tail 回溯的方式找到最前序的可用的线程
// 补充 :
> park() : 阻塞当前线程
> park(Object blocker) : 为了线程调度 , 在许可可用之前兼用当前线程
> unpark : 如果给定线程的许可尚不可用 , 则使其可用
> parkNanos(long nanos) :为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用
- park 方法和 unpark(Thread thread) 方法,都是成对出现的
- unpark(Thread thread) 方法,必须要在 park 方法执行之后执行
> 简介 : CLH 同步队列是一个 FIFO 双向队列,AQS 依赖它来完成同步状态的管理
> 2种状态 :
• 当线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
• 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
C- Node : AbstractQueuedSynchronizer 的内部静态类
SF- Node SHARED = new Node();
SF- Node EXCLUSIVE = null;
SF- int CANCELLED = 1;
SF- int SIGNAL = -1;
SF- int CONDITION = -2;
SF- int PROPAGATE = -3;
F- volatile int waitStatus -- CANCELLED SIGNAL CONDITION PROPAGATE INITAL 总共 5 种状态 , 其中 INITAL 是初始状态
F- volatile Node prev; -- 指向前一个节点
F- volatile Node next; -- 指向后一个节点
F- volatile Thread thread; -- Node 节点对应的线程 Thread
F- Node nextWaiter; -- Node 节点获取同步状态的模型( Mode )
M- tryAcquire : 独占式获取同步状态
M- tryAcquireShared : 共享式获取同步状态
M- addWaiter : 入队
M- isShared : 判断是否为共享式获取同步状态
M- predecessor : 获得 Node 节点的前一个 Node 节点
// 属性详情 :
Node 中包含链表常用的2个概念 : prev , next , 同步器中包含2个属性 head , tail 分别指向队列的头和尾 ,
> 入列 :
M- addWaiter
- 准备新节点 -> 记录尾节点 -> 将新节点放入尾节点 -> CAS 设置新的尾节点
- 失败反复尝试 , 直到成功
> 出列 :
- 首节点的线程释放同步状态后,将会唤醒它的下一个节点(Node.next)。后继节点将会在获取同步状态成功时,将自己设置为首节点( head )
- setHead
- 该操作为单线程操作
// 原理简述 :
AQS 有2个属性
注意:如果head存在,它的waitStatus保证不存在
AQS 有几个重要的方法
仅通过acquire方法调用。
将未使用的字段置空 (GC 及效率)
AQS 中有个内部类 Node , 他是节点对象 , 它其中有四个属性表示状态
它还有如下几个重要的属性
@ https://github.com/black-ant/case/tree/master/case%20Module%20Thread/case%20AQS
public class SimpleLock extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int unused) {
//使用compareAndSetState控制AQS中的同步变量
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
//使用setState控制AQS中的同步变量
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
}
// 分析
1 [ main] try tryAcquire :1 <-------
2 [ main] cas success
3 [ Thread-52] try tryAcquire :1 <-------
4 [ Thread-52] try tryAcquire :1 <-------
5 [ Thread-52] try tryAcquire :1 <-------
6 [ Thread-53] try tryAcquire :1 <-------
16 [ main] try tryRelease :1 <-------
17 [ Thread-52] try tryAcquire :1 <-------
18 [ Thread-52] cas success
19 [ Thread-52] c.g.s.thread.aqs.demo.logic.StartLogic : ------> acquired the lock! <-------
20 [ Thread-52] try tryRelease :1 <-------
21 [ Thread-53] try tryAcquire :1 <-------
22 [ Thread-53] cas success
23 [ Thread-53] c.g.s.thread.aqs.demo.logic.StartLogic : ------> acquired the lock! <-------
24 [ Thread-53] try tryRelease :1 <-------
// 第2行 : main 线程获取了独占锁 , 导致后续3-6行的线程全部无法获取锁 , 排在队列中
// 第16行 : main 释放了锁 , 所以从 17-20 行 ,是 Thread-52 的操作流程 (后面可以看到 53 的队列流)
// AQS 使用如上文所示 , 通常要实现 tryAcquire 和 tryRelease
TODO
阅读量:2018
点赞量:0
收藏量:0