阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
阻塞队列常用于生产者和消费者的场景:
通用概念
队列类型
// 按照类型分类
• 无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque
• 普通阻塞队列:基于数组的ArrayBlockingQueue,基于链表的LinkedBlockingQueue和LinkedBlockingDeque
• 优先级阻塞队列:PriorityBlockingQueue
• 延时阻塞队列:DelayQueue
• 其他阻塞队列:SynchronousQueue和LinkedTransferQueue
处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
检查方法 element() peek() 不可用 不可用
性能对比
线程数 | 20 | 50 | 100 | 200 | 500 | 1000 |
---|---|---|---|---|---|---|
LinkedBlockingQueue | 15,0 | 31,15 | 32,16 | 63,32 | 203,47 | 563,110 |
ArrayBlockingQueue | 15,0 | 16,15 | 31,15 | 47,16 | 125,47 | 364,68 |
PriorityBlockingQueue | 78,78 | 172,188 | 360,422 | 813,969 | 3094,2641 | 6547,5453 |
> 一个由数组实现的有界阻塞队列。该队列采用 FIFO 的原则对元素进行排序添加的
> ArrayBlockingQueue 为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了
> ArrayBlockingQueue 支持对等待的生产者线程和使用者线程进行排序的可选公平策略
- 但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。
- 公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。
// 构造器 :
MC- ArrayBlockingQueue(int capacity)
MC- ArrayBlockingQueue(int capacity, boolean fair)
// 抽象类和接口
I- BlockingQueue<E> : 提供了在多线程环境下的出列、入列操作
?- 内部使用可重入锁 ReentrantLock + Condition 来完成多线程环境的并发操作
// 变量
• items 变量,一个定长数组,维护 ArrayBlockingQueue 的元素。
• takeIndex 变量,int ,为 ArrayBlockingQueue 队首位置。
• putIndex 变量,int ,ArrayBlockingQueue 队尾位置。
• count 变量,元素个数。
• lock 变量,ReentrantLock ,ArrayBlockingQueue 出列入列都必须获取该锁,两个步骤共用一个锁。
• notEmpty 变量,非空,即出列条件。
• notFull 变量,未满,即入列条件。
// 入队
M- add(E e) 方法 : 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true , 满了抛出异常
M- offer(E e) 方法 : 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true , 满了返回false
M- offer(E e, long timeout, TimeUnit unit) 方法 : 将指定的元素插入此队列的尾部 , 已满在设定时间内等待
M- put(E e) 方法 : 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间
M- enqueue :
- 正常添加元素 , 到达队尾的时候重定向到队头
- 总数 + 1
- 通知阻塞线程
// 出列
M- poll() 方法:获取并移除此队列的头,如果此队列为空,则返回 null 。
M- poll(long timeout, TimeUnit unit) 方法:获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
M- take() 方法:获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
M- remove(Object o) 方法:从此队列中移除指定元素的单个实例(如果存在)。
// 核心总结 :
M- offer : 通过 ReentrantLock 上锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- finally -> lock.unlock();
// 关键点 :
1 创建后,容量将无法更改
2 尝试将元素放入满队列将导致操作阻塞
3 尝试从空队列中取出一个元素]也会类似地被阻塞
4 支持可选的公平性策略
支持延时获取元素的无界阻塞队列。里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素
- 如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。
- 也就是说只有在延迟期到时才能够从队列中取元素。
// 作用 :
• 缓存:清掉缓存中超时的缓存数据
• 任务超时处理
// 关键 :
1. 可重入锁ReentrantLock
2. 用于阻塞和通知的Condition对象
3. 根据Delay时间排序的优先级队列:PriorityQueue
4. 用于优化阻塞通知的线程元素leader
// 结构 :
E- AbstractQueue
I- BlockingQueue
M- offer() : 往PriorityQueue中添加元素
- 向 PriorityQueue中插入元素
- 判断当前元素是否为对首元素,如果是的话则设置leader=null , 唤醒所有线程
M- take()
- 获取队首 --- q.peek
IF- 队首为空 , 阻塞 ,等待off 唤醒
ELSE-
获取队首的超时时间 , 已过期则出对
- 如果存在其他线程操作 ,阻塞 , 不存在其他线程 , 独占
- 超时阻塞 --- available.awaitNanos(delay);
- 唤醒阻塞线程
// 使用方式 :
// Step 1 : new 一个
DelayQueue queue = new DelayQueue();
// Step 2 : 加东西
queue.offer(createUserDelayQueueTO());
SynchronousQueue非常适合做交换工作,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务。
C- SynchronousQueue
E- AbstractQueue
I- BlockingQueue
C- TransferQueue
?- 实现公平性策略的核心类,其节点为QNode
LinkedBlockingQueue是一个阻塞队列
// 简介
是先进先出队列FIFO。
采用ReentrantLock保证线程安全
// 操作结果
增加 : 队列满 >
put -> 一直阻塞
add -> 抛出异常
offer -> 返回false
删除 : 队列为空
remove -> NoSuchElementException
poll -> 返回false
take -> 阻塞
// 源码分析
LinkedBlockingQueue
C- static class Node<E> : 核心静态内部类 , 表示一个节点
|- E item : 节点原始
|- Node<E> next : 下一节点
F- int capacity : 容量界限
F- AtomicInteger count : 当前元素个数
F- Node<E> head :头节点
F- Node<E> last : 尾节点
F- ReentrantLock takeLock : take,poll等获取锁
F- Condition notEmpty : 等待任务的等待队列
F- ReentrantLock putLock : put,offer等插入锁
F- Condition notFull : 等待插入的等待队列
MC- LinkedBlockingQueue() : 最大数量
MC- LinkedBlockingQueue(int capacity) : 指定数量
MC- LinkedBlockingQueue(Collection<? extends E> c) : 指定集合
M- signalNotEmpty : 表示等待take。put/offer调用,否则通常不会锁定takeLock
|- 获取 tackLock : this.takeLock
|- 锁定takeLock -> takeLock.lock();
|- 唤醒take 线程等待队列 -> notEmpty.signal();
|- 释放锁 -> takeLock.unlock();
M- signalNotFull : 表示等待put,take/poll 调用
|- 获取putLock : this.putLock;
|- 锁定putLock -> putLock.lock();
|- 唤醒插入线程等待队列 -> notFull.signal();
|- 释放锁
M- enqueue : 在队列尾部插入
|- last = last.next = node;
M- E dequeue():移除队列头
|- 保留头指针
|- 获取当前链表的第一个元素
|- 头指针指向第一个元素
|- 获取第一个元素的值并且移除第一个
|- 返回第一个元素的值
M- fullyLock : 锁定putLock和takeLock
|- putLock.lock();
|- takeLock.lock();
M- fullyUnlock : 先解锁takeLock,再解锁putLock
|- putLock.unlock();
M- offer: 将给定的元素设置到队列中,如果设置成功返回true
|- 非空判断 , 获取计数器
|- 判断队列是否已满 -> 返回 Boolean
|- 新建节点
|- 获取插入锁 , 并且锁定
|- 队列未满 -> 插入 -> 计数
|- 如果未满 ,继续唤醒插入线程
|- 解锁
|- 如果对了为空 ,获取线程锁阻塞
M- offer(E e, long timeout, TimeUnit unit) :给定的时间内设置到队列中
M- put(E e) : 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞 , 直到队列中有多余的空间
|- 核心1 : putLock.lockInterruptibly();
-> 设置前加锁
|- 核心2 : notFull.await();
-> 队列满时等待
M- take() : 从队列中获取值,如果队列中没有值
M- peek() : 非阻塞的获取队列中的第一个元素,不出队列
M- poll() : 非阻塞的获取队列中的值,未获取到返回null。
M- poll(long timeout, TimeUnit unit) :在给定的时间里,从队列中获取值
M- remove(Object o):从队列中移除指定的值。将两把锁都锁定。
M- clear():清空队列。
M- drainTo(Collection c):将队列中值,全部移除,并发设置到给定的集合中。
与其他BlockingQueue相比,他多实现了一个接口TransferQueue, 该接口是对BlockingQueue的一种补充,多了tryTranfer()和transfer()两类方法:
- PriorityBlockingQueue是支持优先级的无界队列。
- 默认情况下采用自然顺序排序,当然也可以通过自定义Comparator来指定元素的排序顺序。
- PriorityBlockingQueue内部采用二叉堆的实现方式,整个处理过程并不是特别复杂。
- 添加操作则是不断“上冒”,而删除操作则是不断“下掉”。
Queue | 阻塞与否 | 是否有界 | 线程安全保障 | 适用场景 | 注意事项 |
---|---|---|---|---|---|
ArrayBlockingQueue | 阻塞 | 有界 | 一把全局锁 | 生产消费模型,平衡两边处理速度 | 用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间) |
LinkedBlockingQueue | 阻塞 | 可配置 | 存取采用 2 把锁 | 生产消费模型,平衡两边处理速度 | 无界的时候注意内存溢出问题,用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加 JVM 垃圾回收的负担。 |
而 ArrayDeque、LinkedBlockingDeque 就是双端队列,类名以 Deque 结尾
正如阻塞队列适用于生产者消费者模式,双端队列同样适用与另一种模式,即工作密取。
> 阻塞队列 : 阻塞队列有普通的先进先出队列,
> 包括基于数组的ArrayBlockingQueue
> 基于链表的LinkedBlockingQueue/LinkedBlockingDeque
> 基于堆的优先级阻塞队列PriorityBlockingQueue
> 可用于定时任务的延时阻塞队列DelayQueue
> 用于特殊场景的阻塞队列SynchronousQueue和LinkedTransferQueue
CopyOnWrite容器即写时复制的容器。 当我们往容器添加元素的时候,先将当前容器进行Copy,复制出一个新的容器, 然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
优缺点
|- 优点:
读操作性能很高,比较适用于读多写少的并发场景。
Java的list在遍历时,若中途有别的线程对list容器进行修改,则会抛出ConcurrentModificationException异常。
而CopyOnWriteArrayList由于其"读写分离"的思想,遍历和修改操作分别作用在不同的list容器
?- 所以在使用迭代器进行遍历时候,也就不会抛出ConcurrentModificationException异常。
|- 缺点:
内存占用问题,执行写操作时会发生数组拷贝
无法保证实时性,Vector对于读写操作均加锁同步,可以保证读和写的强一致性。
而CopyOnWriteArrayList由于其实现策略的原因,写和读分别作用在新老不同容器上
?- 在写操作执行过程中,读不会阻塞但读取到的却是老容器的数据。
|- 使用场景 :
CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景。
CopyOnWriteArrayList
F- ReentrantLock lock = new ReentrantLock() --> 重入锁
F- volatile Object[] array; --> 只能通过 getArray/setArray 访问的数组
M- Object[] getArray() --> 获取数组,非私有方法以便于CopyOnWriteArraySet类的访问
M- setArray(Object[] a) --> 设置数组
M- CopyOnWriteArrayList -- 创建一个空数组
M- CopyOnWriteArrayList(Collection<? extends E> c)
?- 创建一个包含指定集合的数组
B- 如果c的类类型为CopyOnWriteArrayList
|- 直接获取其数组
E- 如果不是
|- 通过 toArray 转数组
|- 如果c.toArray返回的不是 Object[]类型,则通过数组拷贝
|- 设置数组 : setArray(elements);
M- CopyOnWriteArrayList(E[] toCopyIn) : 创建包含给定数组副本的列表
M- size() : 获取数量
M- isEmpty() : 判断列表元素是否为空
M- eq(Object o1, Object o2) : 判断o1 o2是否相等
M- indexOf(Object o, Object[] elements,int index, int fence)
B- 为null , for 循环迭代第一个 null
E- 不为 null ,for 循环 eq
M- lastIndexOf :索引倒叙
M- contains : IndexOf 判断
M- clone : 浅拷贝
|- 重置锁定
|- 返回clone 属性
M- toArray
M- get : 获取原数组中元素
M- set:用指定的元素替换列表中指定位置的元素
|- 获取当前锁并且锁定
|- 获取元素数组
|- 获取老的值
B- 如果老的值和给定值不相等
|- 原数组拷贝 , 将新数组中的索引位置修改为新值
|- 将原数组替换为新数组
E- 否则
|- setArray(elements);
|- 返回老的值
M- add(E e) : 将指定的元素追加到此列表的末尾
|- 获取重入锁 ,锁定
|- 获取原数组
|- 原数组拷贝 并增加一个空位
|- 将指定元素增加到新数组新增的空位中
|- 新数组替换原数组
M- remove :
|- 获取锁并且锁定
|- 获取原数组
|- 获取要删除的元素值 , 获取要移动的值
B- 如果为0,则删除的是最后一个元素
-> setArray(Arrays.copyOf(elements, len - 1));
E- 否则 复制拷贝
|- 新建数组
|- 将原数组中,索引index之前的所有数据,拷贝到新数组中
|- 将元素组,索引index+1 之后的numMoved个元素,复制到新数组,索引index之后
|- 替换原数组
|- 返回老的值 ,最后释放锁
C- COWSubList : 内部视图类
阅读量:2011
点赞量:0
收藏量:0