在并发编程中,我们经常用到的是synchronized和ReentrantLock。其中,synchronized是jvm内置锁,而ReentrantLock位于java.util.concurrent包下(以下简称JUC),ReentrantLock是基于AbstractQueuedSynchronizer(以下简称AQS)同步器框架实现的,本文主要来介绍AQS的内部实现及在JUC中基于AQS实现的相关类。
AQS是一个抽象类,内部维护一个state变量(代表共享资源)、一个FIFO等待队列(用来获取共享资源、线程排队管理等)。AQS定义了两种访问共享资源的方式:Exclusive(独占方式,每次只有一个线程访问资源,如:ReentrantLock)、Share(共享方式,多个线程可以同时访问资源,如:Semaphore、CountDownLatch等)。不管是独占方式还是共享方式,其具体实现只需要实现共享资源state的获取和释放即可,等待队列的相关操作(如获取锁失败入队、唤醒出队等),AQS已经实现好了。
注:虽然AQS提供了独占和共享两种方式访问共享资源,但是两者并不一定是互斥的,还可以是独占和共享共存的方式访问共享资源,如ReentrantReadWriteLock(后面单独分析)。
Node是AQS中的一个静态内部类,上面说到AQS中维护了一个FIFO的双端等待队列,当获取锁失败时,AQS会将当前线程及等待状态等信息构造成一个节点Node,加入到等待队列的队尾,并且当前线程变成阻塞状态,等待唤醒。
//等待队列的Head节点
private transient volatile Node head;
//等待队列的Tail节点
private transient volatile Node tail;
static final class Node {
//共享锁对应节点
static final Node SHARED = new Node();
//独占锁对应节点
static final Node EXCLUSIVE = null;
//等待状态
volatile int waitStatus;
Node nextWaiter;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当前线程
volatile Thread thread;
final boolean isShared() {
return nextWaiter == SHARED;
}
//找到当前节点的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
//在addWaiter()时调用
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
//在Condition中调用
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
· waitStatus:当前线程在等待队列中的状态
| waitStatus | 值 | 功能 |
| SIGNAL | -1 | 当前节点的后继节点被阻塞(通过park)时,设置其前驱节点的状态为SIGNAL。标记为SIGNAL节点的线程释放锁时就会通知后继节点,使得后继节点所在的线程被唤醒。这个状态一般是后继节点来设置前驱节点的。 |
| CANCELLED | 1 | 由于超时(timeout)或中断(interrupt),队列中此节点被取消。节点进入此状态后不会再变化。 |
| CONDITION | -2 | 此节点当前位于条件队列(condition queue)中。当其他线程对这个Condition调用signal方法后,它会被转移同步队列(sync queue)中。 |
| PROPAGATE | -3 | 已发布的节点应该传播到其他节点。这在doReleaseShared中设置(仅针对head节点),以确保传播继续进行,即使其他操作已经介入。 |
| 0 | 0 | 以上的都不是,默认初始无锁状态 |
· prev :等待队列(Sync Queue)中前驱节点
· next :等待队列(Sync Queue)中后继节点
· thread : 当前线程
· nextWaiter:条件队列(Condition Queue)的后继节点
注:等待队列(Sync Queue)在独占模式、共享模式中都会使用到,条件队列(Condition Queue)只会在独占锁中使用。如:
ReentrantLock lock = new ReentrantLock();Condition condition = lock.newCondition();condition.await();condition.signal();通过ReentrantLock可以构造Condition,调用condition的await/signal方法时会把当前线程在Condition Queue中执行入队/出队操作。

当调用condition.await()时,Node节点就会从Sync Queue中进入到Condition Queue中,且对应的pre、next都会被置为null,waitStatus变为CONDITION,并通过nextWaiter形成链表;
而当调用condition.signal()时,Node又会从Condition Queue进入到Sync Queue中,pre、next重新设置,waitStatus根据当前状态进行设置,nextWaiter会被置为null。
注:当调用多个lock.newCondition()时,就会产生多个Condition队列,即一个ReentrantLock可以对应多个Condition Queue。
state表示当前锁的状态,state=0是初始状态,即无锁状态; 当state>0 表示已经有线程获得了锁,当同一个线程多次获得同步锁时(可重入性),state会递增,而在释放锁的时候,state会进行递减直到state=0时,其他线程才有资格获取锁。独占模式下只有一个线程能够获取同步锁,而共享模式下可以有多个线程可以获取同步锁。
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
//通过CAS方式更新state值
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
继承AQS的同步器主要实现:
独占锁需要实现:
· isHeldExclusively():是否是独占资源,当用到condition时需要实现它。
· tryAcquire(int arg):独占锁方式,尝试获取资源,成功返回true,否则返回false。
· tryRelease(int arg):独占锁方式,释放资源,如果释放操作使得所有获取同步器时被阻塞的线程恢复执行,那么返回的是true,否则返回false。
共享锁需要实现:
· tryAcquireShared(int arg):共享方式获取资源,负数表示获取操作失败;0代表成功,但无剩余资源;正数代表成功,且有剩余资源。
· tryReleaseShared(int arg):共享方式尝试释放资源,如果释放后允许唤醒后续节点返回true,否则返回false。
我们来看下上述方法在AQS中的实现:
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
无论是独占锁还是共享锁,获取锁和释放锁在AQS中都是没有实现的,需要在自定义同步器中实现
独占锁相关:
· acquire(int arg): 独占模式下尝试获取共享资源,忽略中断操作。获取成功返回true,否则线程进入等待队列中,等待被唤醒后继续尝试获取共享资源直到成功为止。本方法可以在实现Lock.lock()中使用。acquire()在AQS中的源码:
//tryAcquire尝试获取资源(在子类中实现),如成功直接返回,否则通过addWaiter及acquireQueued将该线程加入到等待队列的队尾,标记为独占模式,如果该线程在等待过程中中断过,acquireQueued方法会返回true,否则返回false。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果tail为空(即队列为空),或通过CAS设置node到队尾失败,继续通过enq()循环设置node到队尾,直至成功为止
enq(node);
return node;
}
//将node设置到队列尾部
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
*1、当前node节点的前驱节点是SIGNAL状态,返回true,意味着当前线程会被挂起,阻塞等待。
*2、如果前驱节点是CANCEL状态(waitStatus>0),跳过此节点并删除,继续往上找,直到找到不是CANCEL状态的节点作为其前驱节点为止;如果前驱节点是0或者PROPAGATE状态,则将前驱节点设置为SIGNAL状态,则在下一次执行循环时shouldParkAfterFailedAcquire可以直接返回true,挂起线程。
**/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
· acquireInterruptibly(int arg) : 同acquire方法,区别在于如果收到中断通知会直接中断。本方法可以在Lock.lockInterruptibly()中使用。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
基于AQS构建的同步器类中,最基本的操作就是各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,通常会阻塞;释放操作时被阻塞的线程会重新开始执行。前面也说到了,AQS通过管理state变量来进行相关操作,跟state相关的方法有getState、setState、compareAndSetState,并且state在不同实现AQS的子类中代表的意思也各不相同。如:state 在 ReentrantLock中代表的是锁所有者线程重复获取该锁的次数;Semaphore 中 state代表剩余的许可数量;FutureTask中state用来表示任务的状态(未开始、正在运行、已完成、已取消)。
· ReentrantLock:只支持独占方式获取锁,所以内部只实现了tryAcquire、tryRelease、isHeldExclusively。ReentrantLock利用AQS对多个条件变量、多个等待线程集内置支持。Lock.newCondition返回一个新的ConditionObject实例。
· CountDownLatch:用于保存当前许可的数量。获取操作意味着“等待并直到闭锁到达结束状态”
· Semaphore:Semaphore翻译为信号量,是一种共享锁,可以同时允许一个或多个线程同时共享资源。
· CyclicBarrier:CyclicBarrier意为循环栅栏,可以实现一组线程等待至某个状态之后再全部继续执行。当所有线程执行完毕后,CyclicBarrier还可以继续被重用。
· ReentrantReadWriteLock:ReentrantReadWriteLock是独占锁(写锁)、共享锁(读锁)可以同时存在的一种读写锁,在读操作远大于写操作的场景中,能实现更好的并发性。
· ThreadPoolExecutor:线程池,内部实现使用的ReentrantLock。另外Android中AsyncTask内部实现即是ThreadPoolExecutor。
获取锁的流程:
· AQS 的模板方法 acquire 通过调用子类自定义实现的 tryAcquire 获取锁;
· 如果获取锁失败,通过 addWaiter 方法将线程构造成 Node 节点插入到同步队列队尾;
· 在 acquirQueued 方法中以自旋的方法尝试获取锁,如果失败则判断是否需要将当前线程阻塞,如果需要阻塞则最终执行 LockSupport(Unsafe) 中的 native API 来实现线程阻塞。
释放锁的流程:
· 首先获取当前节点(实际上传入的是 head 节点)的状态,如果 head 节点的下一个节点是 null,或者下一个节点的状态为 CANCEL(不用再唤醒了),则直接从等待队列的尾部开始遍历,一直遍历到最前面的 waitStatus 小于等于 0 的节点(大于0的状态是CANCEL状态)。
· 如果最终遍历到的节点不为 null,再调用 LockSupport.unpark 方法,调用底层方法唤醒线程。
阅读量:2029
点赞量:0
收藏量:0