研读 Java 语言中线程安全队列的实现
BlockingQueue 和 ConcurrentLinkedQueue (java)实现
Java提供的线程安全的 Queue 可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue。以下的代码基于 openjdk version "11.0.8"
BlockingQueue 是一个接口,具体的实现有很多,如 ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockQueue、SynchronousQueue。就拿 ArrayBlockingQueue 来看吧
一、ArrayBlockingQueue 实现
ArrayBlockingQueue 提供了很多 API 供我们使用,在插入数据和获取数据我只选择了两个具有代表性的 API 进行分析,其余相似的 API 都是换汤不换药,基本的逻辑思想都是一致的。下面代码只罗列出比较重要的部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; notFull.signal(); return x; } }
|
可以看到 java 中加锁的 ArrayBlockingQueue 底层使用数组做为环形队列的实现是比较简单的。我们找找可以优化的点,我自认为:
- 使用一把锁来控制出队入队,相对效率较低,是否也可以借助分段的思想把入队和出队分裂成两个锁,减少锁竞争。可以参考 LinkedBlockingQueue,它采用两把锁的锁分离技术实现入队出队互不阻塞(这样相比arraryblockingqueue来说性能更好因为锁粒度更细)
- 使用条件变量来同步生产者和消费者,而且在锁内部进行条件变量的 notify 或 wait,如果底层使用系统调用则带来上下文切换的性能开销,且 notify 相对来说是很耗时的,会大大影响性能
二、ConcurrentLinkedQueue 的实现
一个基于链表的无锁线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue
是一个恰当的选择。此队列不允许使用 null
元素。
此实现采用了有效的“无等待 (wait-free)”算法,该算法基于 Maged M. Michael 和 Michael L. Scott 合著的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的算法。
java 的版本无锁线程安全队列的 ABA 问题是采用延迟回收思路,依赖 java 的垃圾回收机制。
下面的代码只罗列主要逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { private static class Node<E> { volatile E item; volatile Node<E> next; } private transient volatile Node<E> head; private transient volatile Node<E> tail; public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } public boolean offer(E e) { final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { if (p.casNext(null, newNode)) { if (p != t) casTail(t, newNode); return true; } } else if (p == q) p = (t != (t = tail)) ? t : head; else p = (p != t && t != (t = tail)) ? t : q; } } public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } } }
|
如上是 ConcurrentLinkedQueue 的基本实现,此实现是无界的,因此要及时出队,防止队列元素过多,内存暴增。实现基本上和C++ boost 库中 lockfree.queue 的逻辑思路一致。
三、简单总结
本小节介绍了 Java 中的线程安全容器的实现,介绍了一个加锁的实现和无锁的实现。两者各有优缺点,希望我们可以对比着学习。