之前的队列在很多场景下都不能很好地工作,例如
因此我们需要解决的问题有
有同学对线程安全还没有足够的认识,下面举一个反例,两个线程都要执行入队操作(几乎在同一时刻)
public class TestThreadUnsafe {
    private final String[] array = new String[10];
    private int tail = 0;
    public void offer(String e) {
        array[tail] = e;
        tail++;
    }
    @Override
    public String toString() {
        return Arrays.toString(array);
    }
    public static void main(String[] args) {
        TestThreadUnsafe queue = new TestThreadUnsafe();
        new Thread(()-> queue.offer("e1"), "t1").start();
        new Thread(()-> queue.offer("e2"), "t2").start();
    }
}执行的时间序列如下,假设初始状态 tail = 0,在执行过程中由于 CPU 在两个线程之间切换,造成了指令交错
| 线程1 | 线程2 | 说明 | 
|---|---|---|
| array[tail]=e1 | 线程1 向 tail 位置加入 e1 这个元素,但还没来得及执行 tail++ | |
| array[tail]=e2 | 线程2 向 tail 位置加入 e2 这个元素,覆盖掉了 e1 | |
| tail++ | tail 自增为1 | |
| tail++ | tail 自增为2 | |
| 最后状态 tail 为 2,数组为 [e2, null, null …] | 
糟糕的是,由于指令交错的顺序不同,得到的结果不止以上一种,宏观上造成混乱的效果
Java 中要防止代码段交错执行,需要使用锁,有两种选择
以 ReentrantLock 为例
ReentrantLock lock = new ReentrantLock();
public void offer(String e) {
    lock.lockInterruptibly();
    try {
        array[tail] = e;
        tail++;
    } finally {
        lock.unlock();
    }
}只要两个线程执行上段代码时,锁对象是同一个,就能保证 try 块内的代码的执行不会出现指令交错现象,即执行顺序只可能是下面两种情况之一
| 线程1 | 线程2 | 说明 | 
|---|---|---|
| lock.lockInterruptibly() | t1对锁对象上锁 | |
| array[tail]=e1 | ||
| lock.lockInterruptibly() | 即使 CPU 切换到线程2,但由于t1已经对该对象上锁,因此线程2卡在这儿进不去 | |
| tail++ | 切换回线程1 执行后续代码 | |
| lock.unlock() | 线程1 解锁 | |
| array[tail]=e2 | 线程2 此时才能获得锁,执行它的代码 | |
| tail++ | 
事情还没有完,上面的例子是队列还没有放满的情况,考虑下面的代码(这回锁同时保护了 tail 和 size 的读写安全)
ReentrantLock lock = new ReentrantLock();
int size = 0;
public void offer(String e) {
    lock.lockInterruptibly();
    try {
        if(isFull()) {
            // 满了怎么办?
        }
        array[tail] = e;
        tail++;
        
        size++;
    } finally {
        lock.unlock();
    }
}
private boolean isFull() {
    return size == array.length;
}之前是返回 false 表示添加失败,前面分析过想达到这么一种效果:
ReentrantLock 可以配合条件变量来实现,代码进化为
ReentrantLock lock = new ReentrantLock();
Condition tailWaits = lock.newCondition(); // 条件变量
int size = 0;
public void offer(String e) {
    lock.lockInterruptibly();
    try {
        while (isFull()) {
            tailWaits.await();	// 当队列满时, 当前线程进入 tailWaits 等待
        }
        array[tail] = e;
        tail++;
        
        size++;
    } finally {
        lock.unlock();
    }
}
private boolean isFull() {
    return size == array.length;
}思考为何要用 while 而不是 if,设队列容量是 3
| 操作前 | offer(4) | offer(5) | poll() | 操作后 | 
|---|---|---|---|---|
| [1 2 3] | 队列满,进入tailWaits 等待 | [1 2 3] | ||
| [1 2 3] | 取走 1,队列不满,唤醒线程 | [2 3] | ||
| [2 3] | 抢先获得锁,发现不满,放入 5 | [2 3 5] | ||
| [2 3 5] | 从上次等待处直接向下执行 | [2 3 5 ?] | 
关键点:
最后的实现代码
/**
 * 单锁实现
 * @param <E> 元素类型
 */
public class BlockingQueue1<E> implements BlockingQueue<E> {
    private final E[] array;
    private int head = 0;
    private int tail = 0;
    private int size = 0; // 元素个数
    @SuppressWarnings("all")
    public BlockingQueue1(int capacity) {
        array = (E[]) new Object[capacity];
    }
    ReentrantLock lock = new ReentrantLock();
    Condition tailWaits = lock.newCondition();
    Condition headWaits = lock.newCondition();
    @Override
    public void offer(E e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (isFull()) {
                tailWaits.await();
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }
            size++;
            headWaits.signal();
        } finally {
            lock.unlock();
        }
    }
    @Override
    public void offer(E e, long timeout) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            long t = TimeUnit.MILLISECONDS.toNanos(timeout);
            while (isFull()) {
                if (t <= 0) {
                    return;
                }
                t = tailWaits.awaitNanos(t);
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }
            size++;
            headWaits.signal();
        } finally {
            lock.unlock();
        }
    }
    @Override
    public E poll() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (isEmpty()) {
                headWaits.await();
            }
            E e = array[head];
            array[head] = null; // help GC
            if (++head == array.length) {
                head = 0;
            }
            size--;
            tailWaits.signal();
            return e;
        } finally {
            lock.unlock();
        }
    }
    private boolean isEmpty() {
        return size == 0;
    }
    private boolean isFull() {
        return size == array.length;
    }
}注意
JDK 中 BlockingQueue 接口的方法命名与我的示例有些差异
方法 offer(E e) 是非阻塞的实现,阻塞实现方法为 put(E e)
方法 poll() 是非阻塞的实现,阻塞实现方法为 take()
单锁的缺点在于:
如果希望进一步提高性能,可以用两把锁
ReentrantLock headLock = new ReentrantLock();  // 保护 head 的锁
Condition headWaits = headLock.newCondition(); // 队列空时,需要等待的线程集合
ReentrantLock tailLock = new ReentrantLock();  // 保护 tail 的锁
Condition tailWaits = tailLock.newCondition(); // 队列满时,需要等待的线程集合先看看 offer 方法的初步实现
@Override
public void offer(E e) throws InterruptedException {
    tailLock.lockInterruptibly();
    try {
        // 队列满等待
        while (isFull()) {
            tailWaits.await();
        }
        
        // 不满则入队
        array[tail] = e;
        if (++tail == array.length) {
            tail = 0;
        }
        
        // 修改 size (有问题)
        size++;
        
    } finally {
        tailLock.unlock();
    }
}上面代码的缺点是 size 并不受 tailLock 保护,tailLock 与 headLock 是两把不同的锁,并不能实现互斥的效果。因此,size 需要用下面的代码保证原子性
AtomicInteger size = new AtomicInteger(0);	   // 保护 size 的原子变量
size.getAndIncrement(); // 自增
size.getAndDecrement(); // 自减代码修改为
@Override
public void offer(E e) throws InterruptedException {
    tailLock.lockInterruptibly();
    try {
        // 队列满等待
        while (isFull()) {
            tailWaits.await();
        }
        
        // 不满则入队
        array[tail] = e;
        if (++tail == array.length) {
            tail = 0;
        }
        
        // 修改 size
        size.getAndIncrement();
        
    } finally {
        tailLock.unlock();
    }
}对称地,可以写出 poll 方法
@Override
public E poll() throws InterruptedException {
    E e;
    headLock.lockInterruptibly();
    try {
        // 队列空等待
        while (isEmpty()) {
            headWaits.await();
        }
        
        // 不空则出队
        e = array[head];
        if (++head == array.length) {
            head = 0;
        }
        
        // 修改 size
        size.getAndDecrement();
        
    } finally {
        headLock.unlock();
    }
    return e;
}下面来看一个难题,就是如何通知 headWaits 和 tailWaits 中等待的线程,比如 poll 方法拿走一个元素,通知 tailWaits:我拿走一个,不满了噢,你们可以放了,因此代码改为
@Override
public E poll() throws InterruptedException {
    E e;
    headLock.lockInterruptibly();
    try {
        // 队列空等待
        while (isEmpty()) {
            headWaits.await();
        }
        
        // 不空则出队
        e = array[head];
        if (++head == array.length) {
            head = 0;
        }
        
        // 修改 size
        size.getAndDecrement();
        
        // 通知 tailWaits 不满(有问题)
        tailWaits.signal();
        
    } finally {
        headLock.unlock();
    }
    return e;
}问题在于要使用这些条件变量的 await(), signal() 等方法需要先获得与之关联的锁,上面的代码若直接运行会出现以下错误
java.lang.IllegalMonitorStateException那有同学说,加上锁不就行了吗,于是写出了下面的代码

发现什么问题了?两把锁这么嵌套使用,非常容易出现死锁,如下所示

因此得避免嵌套,两段加锁的代码变成了下面平级的样子

性能还可以进一步提升
每次调用 signal() 都需要这些 offer 线程先获得 headLock 锁,成本较高,要想法减少 offer 线程获得 headLock 锁的次数
可以加一个条件:当 offer 增加前队列为空,即从 0 变化到不空,才由此 offer 线程来通知 headWaits,其它情况不归它管
2. 队列从 0 变化到不空,会唤醒一个等待的 poll 线程,这个线程被唤醒后,肯定能拿到 headLock 锁,因此它具备了唤醒 headWaits 上其它 poll 线程的先决条件。如果检查出此时有其它 offer 线程新增了元素(不空,但不是从0变化而来),那么不妨由此 poll 线程来唤醒其它 poll 线程
这个技巧被称之为级联通知(cascading notifies),类似的原因
3. 在 poll 时队列从满变化到不满,才由此 poll 线程来唤醒一个等待的 offer 线程,目的也是为了减少 poll 线程对 tailLock 上锁次数,剩下等待的 offer 线程由这个 offer 线程间接唤醒
最终的代码为
public class BlockingQueue2<E> implements BlockingQueue<E> {
    private final E[] array;
    private int head = 0;
    private int tail = 0;
    private final AtomicInteger size = new AtomicInteger(0);
    ReentrantLock headLock = new ReentrantLock();
    Condition headWaits = headLock.newCondition();
    ReentrantLock tailLock = new ReentrantLock();
    Condition tailWaits = tailLock.newCondition();
    public BlockingQueue2(int capacity) {
        this.array = (E[]) new Object[capacity];
    }
    @Override
    public void offer(E e) throws InterruptedException {
        int c;
        tailLock.lockInterruptibly();
        try {
            while (isFull()) {
                tailWaits.await();
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }            
            c = size.getAndIncrement();
            // a. 队列不满, 但不是从满->不满, 由此offer线程唤醒其它offer线程
            if (c + 1 < array.length) {
                tailWaits.signal();
            }
        } finally {
            tailLock.unlock();
        }
        // b. 从0->不空, 由此offer线程唤醒等待的poll线程
        if (c == 0) {
            headLock.lock();
            try {
                headWaits.signal();
            } finally {
                headLock.unlock();
            }
        }
    }
    @Override
    public E poll() throws InterruptedException {
        E e;
        int c;
        headLock.lockInterruptibly();
        try {
            while (isEmpty()) {
                headWaits.await(); 
            }
            e = array[head]; 
            if (++head == array.length) {
                head = 0;
            }
            c = size.getAndDecrement();
            // b. 队列不空, 但不是从0变化到不空,由此poll线程通知其它poll线程
            if (c > 1) {
                headWaits.signal();
            }
        } finally {
            headLock.unlock();
        }
        // a. 从满->不满, 由此poll线程唤醒等待的offer线程
        if (c == array.length) {
            tailLock.lock();
            try {
                tailWaits.signal();
            } finally {
                tailLock.unlock();
            }
        }
        return e;
    }
    private boolean isEmpty() {
        return size.get() == 0;
    }
    private boolean isFull() {
        return size.get() == array.length;
    }
}双锁实现的非常精巧,据说作者 Doug Lea 花了一年的时间才完善了此段代码
阅读量:1051
点赞量:0
收藏量:0