Lock 接口是一切的基础 , 它抽象类一种用于控制多个线程对共享资源的访问的工具 .
> 提供了以下方法用于抽象整个业务 :
> Lock 接口提供了区别于隐式监视锁更多的功能 :
> 内存同步 :
ReentranLock 即重入锁 , 表示在单个线程内,这个锁可以反复进入,也就是说,一个线程可以连续两次获得同一把锁 .
ReentranLock 比 synchronized 提供更具拓展行的锁操作。它允许更灵活的结构,可以具有完全不同的性质,并且可以支持多个相关类的条件对象。
它的优势有:
它的特点有:
ReentranLock 基本使用
private Lock lock = new ReentrantLock();
public void test() {
lock.lock();
for (int i = 0; i < 5; i++) {
logger.info("------> CurrentThread [{}] , i : [{}] <-------", Thread.currentThread().getName(), i);
}
lock.unlock();
}
Sync 是 ReentranLock 的内部抽象类 , 其后续会用来实现两种不同的锁 , 这里先看看Sync 内部做了什么
Node 1 : 继承于AbstractQueuedSynchronizer 又名 AQS , 这些大家就知道它了 , Sync 使用aqs state 来表示锁上的持有数
abstract static class Sync extends AbstractQueuedSynchronizer
Node 2 : 有一个抽象方法 lock , 后续的公平和非公平会分别实现对应的方法
abstract void lock();
// ?- 非公平锁的同步对象
static final class NonfairSync extends Sync
> 区别方法 : final void lock() : 对比公平锁有一个修改state 的操作 , 修改成功则设置当前拥有独占访问权限的线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// ?- 公平锁同步对象
static final class FairSync extends Sync
> 区别方法 : tryAcquire(int acquires) , 其中最大的缺别在于会查询是否有线程等待获取的时间长于当前线程
Node 3 : nonfairTryAcquire 方法干了什么
final boolean nonfairTryAcquire(int acquires) {
// 获取当前 Thread 和状态
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// CAS 设置状态
if (compareAndSetState(0, acquires)) {
// 设置当前拥有独占访问权限的线程
// null 表示没有线程获取了访问权限
setExclusiveOwnerThread(current);
return true;
}
}
// 返回由 setExclusiveOwnerThread 设置的最后一个线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
Node 3 : tryRelease 释放
// 重要可以看到2个操作 : setExclusiveOwnerThread + setState
// setExclusiveOwnerThread 为 null 表示没有线程获取了访问权限
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
相同点
不同点
总结 : 在 synchronized 优化以前,它的性能是比 ReenTrantLock 差很多的,但是自从 synchronized 引入了偏向锁,轻量级锁(自旋锁)后,两者的性能就差不多了 .
在两种方法都可用的情况下,官方甚至建议使用 synchronized 。
并且,实际代码实战中,可能的优化场景是,通过读写分离,进一步性能的提升,所以使用 ReentrantReadWriteLock
// 常用方法 :
- void lock()
- Condition newCondition()
- boolean tryLock()
- void unlock()
--------------
// Node 1 : 基础于Lock 接口 , 并且支持序列化
ReentrantLock implements Lock, java.io.Serializable
--------------
// Node 2 : 内部类 , ReentrantLock 中有几个很重要的 sync 类 , Sync 是同步控制的基础
--------------
// Node 3 : 公平非公平的切换方式
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
--------------
// Node 4 : Lock 方法的实现 , 默认调用 NonfairSync
public void lock() {
sync.lock();
}
--------------
// Node 5 : lockInterruptibly 的实现方式
sync.acquireInterruptibly(1);
// Node 6 :
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
读写锁是用来提升并发程序性能的锁分离技术的 Lock 实现类。可以用于 “多读少写” 的场景,读写锁支持多个读操作并发执行,写操作只能由一个线程来操作。
ReadWriteLock 对向数据结构相对不频繁地写入,但是有多个任务要经常读取这个数据结构的这类情况进行了优化。
ReadWriteLock 使得你可以同时有多个读取者,只要它们都不试图写入即可。如果写锁已经被其他任务持有,那么任何读取者都不能访问,直至这个写锁被释放为止。
ReadWriteLock 对程序性能的提高主要受制于如下几个因素:
特征 :
深入 ReadWriteLock :
ReadWriteLock 是一个接口 , 它仅仅提供了2个方法 :
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
重入锁 ReentrantLock 是排他锁,排他锁在同一时刻仅有一个线程可以进行访问 , ReentrantReadWriteLock 则是可重入的读写锁实现类 , 只要没有线程 writer , 读取锁可以由多个 Reader 线程同时保持
I- ReadWriteLock
M- Lock readLock();
M- Lock writeLock();
C- ReentrantReadWriteLock : 可重入的读写锁实现类
I- ReadWriteLock
?- 内部维护了一对相关的锁,一个用于只读操作,另一个用于写入操作 , 写锁是独占的,读锁是共享的
使用案例 :
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
if (!cacheValid) {
data = "test";
cacheValid = true;
}
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
}
Node 1 : 内部提供了2个内部属性 , 这也就是为什么能做到独写锁分离
// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
Node 2 : 再次出现的 Sync , 老规矩 , Sync 还是通过 fair 去判断创建
final Sync sync;
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
Node 3 : Sync 内部状态控制
// 读取和写入计数提取常量和函数。Lock state在逻辑上分为两个 :
// 较低的(低16)表示排他(写入)锁保持计数,较高的(高16)表示共享(读取)锁保持计数。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
计数的方式 :
// 获得持有读状态的锁的线程数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
读状态,等于 S >>> 16 (无符号补 0 右移 16 位)
// 获得持有写状态的锁的次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
写状态,等于 S & 0x0000FFFF(将高 16 位全部抹去)
Node 4 : HoldCounter 类的作用 : 每个读线程需要单独计数用于重入
// 每个线程读取保持计数的计数器。作为ThreadLocal维护 , 缓存在cachedHoldCounter
static final class HoldCounter {
int count = 0;
// 非引用有助于垃圾回收
final long tid = getThreadId(Thread.currentThread());
}
// 成功获取readLock的最后一个线程的保持计数
private transient HoldCounter cachedHoldCounter;
Node 5 : ThreadLocalHoldCounter , 为了反序列化机制
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 当前线程持有的可重入读锁的数量。仅在构造函数和readObject中初始化。当线程的读保持计数下降到0时删除
private transient ThreadLocalHoldCounter readHolds;
Node 6 : Sync 内部类
NonfairSync : 不公平锁
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
// 如果暂时出现在队列头的线程(如果存在)是正在等待的写入器,则阻塞
// 如果在其他已启用的、尚未从队列中耗尽的读取器后面有一个正在等待的写入器,那么新的读取器将不会阻塞
return apparentlyFirstQueuedIsExclusive();
}
FairSync : 公平锁
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
在 Java SE 5 后,Java 提供了 Lock 接口,相对于 synchronized 而言,Lock 提供了条件 Condition ,对线程的等待、唤醒操作更加详细和灵活
> AQS 等待队列与 Condition 队列是两个相互独立的队列
#await() 就是在当前线程持有锁的基础上释放锁资源,并新建 Condition 节点加入到 Condition 的队列尾部,阻塞当前线程 。
#signal() 就是将 Condition 的头节点移动到 AQS 等待节点尾部,让其等待再次获取锁。
Condition 其实是一个接口 , 其在 AQS 中存在一个是实现类 , ConditionObject , 我们就主要说说它 :
Node 1 : 属性对象
// condition queue 第一个节点
private transient Node firstWaiter;
// condition queue 最后一个节点
private transient Node lastWaiter;
Node 2 : 核心方法 doSignal + doSignalAll
// doSignal : 删除和传输节点,直到碰到非取消的1或null
private void doSignal(Node first) {
do {
// 先判断是否为头节点或者null
// 注意其中的 = 是赋值 : !transferForSignal(first) &&(first = firstWaiter) != null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// transferForSignal 是 AQS 方法, 将节点从条件队列转移到同步队列 , 主要是 CAS 操作修改状态
// Node p = enq(node); 这里面是一个Node 拼接操作 , 其实可以理解为已经将 Node 加入对应的队列里面了
} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
// doSignalAll : 删除和传输所有节点 , 注意区别 , 这里不像 Notify 是通知是有线程去获取
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
Node 3 : 主要方法 await
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// Step 1 : 添加到 Condition 队列
Node node = addConditionWaiter();
// Step 2 : 使用当前状态值调用release , 且返回保存的状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果一个节点(总是最初放置在条件队列上的节点)现在正在同步队列上等待重新获取,则返回true
// 即如果有节点等待
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 以独占不可中断模式获取已经在队列中的线程。
// 用于条件等待方法和获取方法。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// awaitNanos(long nanosTimeout) : 定时条件等待
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
// awaitUntil(Date deadline) : 实现绝对定时条件等待 , 即一个定时操作
// 超时后直接传输节点
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
Node 4 : Release 方法
// 使用当前状态值调用release;返回保存的状态。
// 取消节点并在失败时抛出异常。
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取状态
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 设置当前状态为取消
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
// 实现具体重写
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Node 5 : 其他方法 :
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
// 保存的状态作为参数 , 如果失败,抛出IllegalMonitorStateException
int savedState = fullyRelease(node);
boolean interrupted = false;
// 阻塞直到有信号
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
// acquireQueued : 保存的state作为参数 , 以独占不可中断模式获取已经在队列中的线程 , 重新获取
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
阅读量:2011
点赞量:0
收藏量:0