Java 多线程 : 勉强弄懂了AQS-灵析社区

带鱼

一 . AQS 基础

一句话概括AQS :
  • 一个叫 AbstractQueuedSynchronizer 的抽象类
  • 包含2个重要概念 : 以Node为节点实现的链表的队列(CHL队列) + STATE标志
  • 支持2种锁 : 独占锁和共享锁 ,

1.1 什么是 AQS ?

java.util.concurrent.locks.AbstractQueuedSynchronizer 抽象类,简称 AQS , 队列同步器

作用 :  用于构建锁和同步容器的同步器
原理 :  使用一个 FIFO 的队列表示排队等待锁的线程

  • 队列头节点称作“哨兵节点”或者“哑节点”,不与任何线程关联。
  • 其他的节点与等待线程关联,每个节点维护一个等待状态 waitStatus

场景 :  AQS解决了实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列

1.2 AQS 的 status 表示了什么 ?

AQS 使用一个int 的 status 来表示同步状态, 同步状态重要目的是用于跟踪线程是否应该阻塞 , 当它的前身释放时,一个节点被通知。否则,队列中的每个节点都充当一个特定通知样式的监视器,该监视器持有单个等待线程.

  • status > 0 : 获取了锁
  • status = 0 : 释放了锁
  • status < 0 :

1.3 常用方法:

状态处理
  • getState():返回同步状态的当前值。
  • setState(int newState):设置当前同步状态。
  • compareAndSetState(int expect, int update):使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。
独占锁相关方法
  • 【可重写】#tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态。
  • 【可重写】#tryRelease(int arg):独占式释放同步状态。
共享锁相关方法
  • 【可重写】#tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于 0 ,则表示获取成功;否则,获取失败。
  • 【可重写】#tryReleaseShared(int arg):共享式释放同步状态。
  • 【可重写】#isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占。
独占式获取同步状态
  • acquire(int arg):独占式获取同步状态。
      如果当前线程获取同步状态成功,则由该方法返回;否则,将会进入同步队列等待。该方法将会调用可重写的       #tryAcquire(int arg) 方法;
  • acquireInterruptibly(int arg):与 #acquire(int arg) 相同,但是该方法响应中断。

  当前线程为获取到同步状态而进入到同步队列中

  如果当前线程被中断,则抛出 InterruptedException()

  如果未中断 ,将尝试调用 tryAcquire , 调用失败线程将进入队列,可能会反复阻塞和解除阻塞

  • tryAcquireNanos(int arg, long nanos):超时获取同步状态。

  如果当前线程被中断,则抛出 InterruptedException()

  如果当前线程在 nanos 时间内没有获取到同步状态,那么将会返回 false ,已经获取则返回 true 。

  未超时未获取会一只排队 ,反复阻塞

共享式获取同步状态
  • acquireShared(int arg):共享式获取同步状态
      如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断。
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制。
释放同步状态
  • release(int arg):独占式释放同步状态
      该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。
  • releaseShared(int arg):共享式释放同步状态。
public class SimpleLock extends AbstractQueuedSynchronizer {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    protected boolean tryAcquire(int unused) {
        logger.info("------> try tryAcquire :{}  <-------", unused);
        //使用compareAndSetState控制AQS中的同步变量
        if (compareAndSetState(0, 1)) {
            logger.info("------> cas success ");
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int unused) {
        logger.info("------> try tryRelease :{} <-------", unused);
        setExclusiveOwnerThread(null);
        //使用setState控制AQS中的同步变量
        setState(0);
        return true;
    }

    public void lock() {
        acquire(1);
    }

    public boolean tryLock() {
        return tryAcquire(1);
    }

    public void unlock() {
        release(1);
    }

}
其他关联知识点 : 1 AQS的所有子类中,要么使用了它的独占锁,要么使用了它的共享锁,不会同时使用它的两个锁。

二 .  AQS 原理

2.1  基本原理

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中

  • AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。
  • AQS使用CAS对该同步状态进行原子操作实现对其值的修改。
  • 在await之后,一个节点被插入到条件队列中(可见后面代码)。收到信号后,节点被转移到主队列
CLH(Craig,Landin,and Hagersten)队列 该队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
AQS定义两种资源共享方式 Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
  • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的

Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。

2.1  AQS底层使用了模板方法模式

2.1.1 模板方法详情

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
  2. 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:
以下方法未重写抛出 UnsupportedOperationException

  • isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

2.1.2 常见的实现案例

Semaphore(信号量)

功能 :  允许多个线程同时访问

synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。

CountDownLatch (倒计时器) 功能 : CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。

这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。

CyclicBarrier(循环栅栏)

功能 :  CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。

应用场景 : 和 CountDownLatch 类似。CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。

作用 : 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

三 .  AQS 同步状态的获取和释放

独占式获取和释放同步状态
  • 同一时刻,仅有一个线程持有同步状态
  • acquire(int arg) : 该方法对中断不敏感 , 即后续对该线程进行中断操作时,线程不会从 CLH 同步队列中移除
  • tryAcquire(int arg) : 去尝试获取同步状态

  true : 获取成功 , 设置锁状态 , 直接返回不用线程阻塞 , 自旋直到获得同步状态成功

  •     false : 获取失败 , 用#addWaiter(Node mode) 方法,将当前线程加入到 CLH 同步队列尾部,mode 方法参数为Node.EXCLUSIVE
    • acquireQueued : 自旋直到获得同步状态成功
    
    public final void acquire(int arg) {
    	if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    		selfInterrupt();
    }
    
    共享式获取和释放同步状态
    
    // 首先调用至少一次tryacquisharered
    // 
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    
    // 上述调用失败 , 线程可能会进入队列反复阻塞和解除阻塞
        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    // shouldParkAfterFailedAcquire 检查并更新未能获取的节点的状态。如果线程阻塞,则返回true
                   // parkAndCheckInterrupt : 中断线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    
    shouldParkAfterFailedAcquire 的作用
    • 等待状态为 Node.SIGNAL 时,表示 pred 的下一个节点 node 的线程需要阻塞等待。在pred 的线程释放同步状态时,会对 node 的线程进行唤醒通知
    • 等待状态为 0 或者 Node.PROPAGATE 时,通过 CAS 设置,将状态修改为 Node.SIGNAL
    • 等待状态为 NODE.CANCELLED 时,则表明该线程的前一个节点已经等待超时或者被中断了,则需要从 CLH 队列中将该前一个节点删除掉,循环回溯,直到前一个节点状态 <= 0
    查询同步队列中的等待线程情况
     // 自旋处理流程: 
            for (;;) {
                	// 获取当前线程的前驱节点
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        // 当前线程的前驱节点是头结点,且同步状态成功
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                	//  获取失败,线程等待
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }   
    
    
    
    

    四. 阻塞和唤醒线程

    AQS 在处理中有2处值得深入的过程 : 阻塞和唤醒
    
    // 阻塞发生在获取对应同步方法同步失败后 ,流程为 : 
    Step Start : 获取同步状态 -> 获取失败 -> 检查该状态 shouldParkAfterFailedAcquire(Node pred, Node node) 
    Step 2 : 返回true -> 当前线程应该被柱塞
    Step 3 : parkAndCheckInterrupt() 阻塞线程
        - 调用 LockSupport#park(Object blocker) 方法,将当前线程挂起,此时就进入阻塞等待唤醒的状态
        
    // 后续将进行线程的唤醒操作 , 唤醒分为2种    
    第一种,当前节点(线程)的前序节点释放同步状态时,唤醒了该线程
    第二种,当前线程被打断导致唤醒。
    
    Step Start : 当线程释放同步状态后 , 唤醒该线程的后继节点 (unparkSuccessor)
    Step 2 : 后继节点存在 , 唤醒后继节点 LockSupport.unpark(s.thread)
    Step 3 : 如果后继节点为null (超时 , 中断)  , 通过 tail 回溯的方式找到最前序的可用的线程
        
    // 补充 : 
    > park() : 阻塞当前线程
    > park(Object blocker) : 为了线程调度 , 在许可可用之前兼用当前线程  
    > unpark : 如果给定线程的许可尚不可用 , 则使其可用
    > parkNanos(long nanos) :为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用    
        
    - park 方法和 unpark(Thread thread) 方法,都是成对出现的
    - unpark(Thread thread) 方法,必须要在 park 方法执行之后执行
        
        
    

    五 . CLH 同步队列

    > 简介 : CLH 同步队列是一个 FIFO 双向队列,AQS 依赖它来完成同步状态的管理
    
    > 2种状态 : 
    	• 当线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
    	• 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
    
    C- Node :  AbstractQueuedSynchronizer 的内部静态类
    	SF- Node SHARED = new Node();
    	SF- Node EXCLUSIVE = null;
    	SF- int CANCELLED =  1;
    	SF- int SIGNAL    = -1;
    	SF- int CONDITION = -2;
    	SF- int PROPAGATE = -3;
    	F- volatile int waitStatus --  CANCELLED SIGNAL CONDITION PROPAGATE INITAL 总共 5 种状态 , 其中 INITAL 是初始状态
    	F- volatile Node prev; -- 指向前一个节点
    	F- volatile Node next; -- 指向后一个节点
    	F- volatile Thread thread; -- Node 节点对应的线程 Thread
    	F- Node nextWaiter; -- Node 节点获取同步状态的模型( Mode )
    	M- tryAcquire : 独占式获取同步状态
    	M- tryAcquireShared : 共享式获取同步状态
    	M- addWaiter : 入队
    	M- isShared : 判断是否为共享式获取同步状态
    	M- predecessor : 获得 Node 节点的前一个 Node 节点     
            
            
    // 属性详情 : 
    Node 中包含链表常用的2个概念 : prev , next  , 同步器中包含2个属性 head , tail 分别指向队列的头和尾 , 
    
    	
    > 入列 : 
    	M- addWaiter
    		- 准备新节点 -> 记录尾节点 -> 将新节点放入尾节点 -> CAS 设置新的尾节点
    		- 失败反复尝试 , 直到成功
    
    
    > 出列 : 
    	- 首节点的线程释放同步状态后,将会唤醒它的下一个节点(Node.next)。后继节点将会在获取同步状态成功时,将自己设置为首节点( head )
    		- setHead
    	- 该操作为单线程操作
    	
    // 原理简述 : 
            
    	
    	
    

    六. AQS 源码

    9.6.1 问题一 : CLH 的形式

    AQS 有2个属性

    • private transient volatile Node head;
        等待队列的头,懒加载。初始化后只能通过sehead方法进行修改。

      注意:如果head存在,它的waitStatus保证不存在

    • private transient volatile Node tail;
        尾部的等待队列,懒加载

    AQS 有几个重要的方法

    • private Node addWaiter(Node mode)
        为当前线程和给定模式创建并进入节点队列
    • private void setHead(Node node)
      设置队列头为节点,退出队列。

      仅通过acquire方法调用。

      将未使用的字段置空 (GC 及效率)

    • private Node enq(final Node node)
        将节点插入队列,必要时进行初始化

    9.6.2 问题二 : Node 节点

    AQS 中有个内部类 Node , 他是节点对象 , 它其中有四个属性表示状态

    • SIGNAL :该节点的后继节点被阻塞(或即将被阻塞)(通过park),因此当前节点在释放或取消后继节点时必须解除它的后继节点的阻塞
    • CANCELLED : 该节点因超时或中断而被取消
    • CONDITION : 该节点当前在条件队列中。它将不会被用作同步队列节点,直到传输时,状态将被设置为0
    • PROPAGATE : 这个 releaseShared 应该传播到其他节点

    它还有如下几个重要的属性

    • volatile Node prev : 前一个节点
    • volatile Node next : 下一个节点
    • volatile Thread thread : 当前线程

    9.6.3 AQS 状态

    • private volatile int state;

    9.6.5 AQS 流程图

    七 . AQS 使用

    @ https://github.com/black-ant/case/tree/master/case%20Module%20Thread/case%20AQS
    
    public class SimpleLock extends AbstractQueuedSynchronizer {
    
        @Override
        protected boolean tryAcquire(int unused) {
            //使用compareAndSetState控制AQS中的同步变量
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        @Override
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            //使用setState控制AQS中的同步变量
            setState(0);
            return true;
        }
    
        public void lock() {
            acquire(1);
        }
    
        public boolean tryLock() {
            return tryAcquire(1);
        }
    
        public void unlock() {
            release(1);
        }
    
    }
    
    // 分析
    1	 [           main] try tryAcquire :1  <-------
    2	 [           main] cas success 
    3	 [      Thread-52] try tryAcquire :1  <-------
    4	 [      Thread-52] try tryAcquire :1  <-------
    5	 [      Thread-52] try tryAcquire :1  <-------
    6	 [      Thread-53] try tryAcquire :1  <-------
    
    16	 [           main] try tryRelease :1 <-------
    17	 [      Thread-52] try tryAcquire :1  <-------
    18	 [      Thread-52] cas success 
    19	 [      Thread-52] c.g.s.thread.aqs.demo.logic.StartLogic   : ------> acquired the lock! <-------
    20	 [      Thread-52] try tryRelease :1 <-------
    21	 [      Thread-53] try tryAcquire :1  <-------
    22	 [      Thread-53] cas success 
    23	 [      Thread-53] c.g.s.thread.aqs.demo.logic.StartLogic   : ------> acquired the lock! <-------
    24	 [      Thread-53] try tryRelease :1 <-------
        
        
    // 第2行 : main 线程获取了独占锁 , 导致后续3-6行的线程全部无法获取锁 , 排在队列中
    // 第16行 : main 释放了锁 , 所以从 17-20 行 ,是 Thread-52 的操作流程 (后面可以看到 53 的队列流)
    
    
    
    // AQS 使用如上文所示 , 通常要实现 tryAcquire 和 tryRelease
    TODO 
    


    阅读量:2018

    点赞量:0

    收藏量:0