研读 Java 语言中线程安全队列的实现

研读 Java 语言中线程安全队列的实现

BlockingQueue 和 ConcurrentLinkedQueue (java)实现

Java提供的线程安全的 Queue 可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue。以下的代码基于 openjdk version "11.0.8"

BlockingQueue 是一个接口,具体的实现有很多,如 ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockQueue、SynchronousQueue。就拿 ArrayBlockingQueue 来看吧

一、ArrayBlockingQueue 实现

ArrayBlockingQueue 提供了很多 API 供我们使用,在插入数据和获取数据我只选择了两个具有代表性的 API 进行分析,其余相似的 API 都是换汤不换药,基本的逻辑思想都是一致的。下面代码只罗列出比较重要的部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 底层用于存储数据节点的数组
final Object[] items;
// 移除元素现在所处的位置索引
int takeIndex;
// 插入元素现在所处的位置索引
int putIndex;
// 队列中元素的个数
int count;
// 可重入的独占锁
final ReentrantLock lock;
// 为了移除元素的等待条件
private final Condition notEmpty;
// 为了插入元素的等待条件
private final Condition notFull;

// 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
// 首先加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 当队列元素个数已满的时候,等待一定时间再去判断,如果还是满的就退出
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
// 队列有空间,则插入元素
enqueue(e);
return true;
} finally {
// 最后解锁
lock.unlock();
}
}
// 向队列尾插入元素
private void enqueue(E x) {
final Object[] items = this.items;
// 在 putIndex 位置添加数据
items[putIndex] = x;
// 因为队列底层使用环形数组,所以当达到数组长度时,putIndex 重头再来
if (++putIndex == items.length)
putIndex = 0;
// 元素个数自增
count++;
// 添加完数据后,说明数组中有数据了,所以可以唤醒 notEmpty 条件对象等待队列(链表)中第一个可用线程去获取数据
notEmpty.signal();
}

// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
// 加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列为空,则等待一定时间再判断
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
// 取出数据
return dequeue();
} finally {
lock.unlock();
}
}
// 从队列头取出数据
private E dequeue() {
final Object[] items = this.items;
// 提取 takeIndex 位置上的数据
E x = (E) items[takeIndex];
// 清空 takeIndex 位置上的数据
items[takeIndex] = null;
// 环形数组
if (++takeIndex == items.length)
takeIndex = 0;
// 数组中元素个数减一
count--;
// 提取完数据后,说明数组中有空位,所以可以唤醒 notFull 条件对象的等待队列(链表)中的第一个可用线程去写数据
notFull.signal();
return x;
}
}

可以看到 java 中加锁的 ArrayBlockingQueue 底层使用数组做为环形队列的实现是比较简单的。我们找找可以优化的点,我自认为:

  1. 使用一把锁来控制出队入队,相对效率较低,是否也可以借助分段的思想把入队和出队分裂成两个锁,减少锁竞争。可以参考 LinkedBlockingQueue,它采用两把锁的锁分离技术实现入队出队互不阻塞(这样相比arraryblockingqueue来说性能更好因为锁粒度更细)
  2. 使用条件变量来同步生产者和消费者,而且在锁内部进行条件变量的 notify 或 wait,如果底层使用系统调用则带来上下文切换的性能开销,且 notify 相对来说是很耗时的,会大大影响性能

二、ConcurrentLinkedQueue 的实现

一个基于链表的无锁线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。

此实现采用了有效的“无等待 (wait-free)”算法,该算法基于 Maged M. Michael 和 Michael L. Scott 合著的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的算法。

java 的版本无锁线程安全队列的 ABA 问题是采用延迟回收思路,依赖 java 的垃圾回收机制。

下面的代码只罗列主要逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
// Node 节点定义
private static class Node<E> {
// Node 中的元素
volatile E item;
// Node 中 next 指针
volatile Node<E> next;
}
// head 头指针
private transient volatile Node<E> head;
// tail 尾指针
private transient volatile Node<E> tail;

// 构造函数,会将 head 和 tail 指向一个傀儡节点
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

// offer 函数,将指定元素插入此队列的尾部
public boolean offer(E e) {
// 先拿到业务数据并创建一个 Node 节点
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// 代表 tail->next 为空,则说明 tail->next 没有被别的线程添加元素,现在给 tail->next 添加元素
if (p.casNext(null, newNode)) {
// 在 CAS 中判断 tail->next 是否为空,为空则将 newNode 赋给 tail->next
// 将元素赋给 tail->next 后,需要更新 tail 的位置
// p 此时指向 newNode(tail->next) 位置,t 指向 tail 位置。这是理想情况
// 因为这里会出现多个线程同时发现 tail->next 不为空的情况,所以 tail 指针和实际的尾节点的距离不一定是1
if (p != t)
// 因为没有要求 tail 指针和实际的尾节点的距离是1
casTail(t, newNode);
return true;
}
// 这里是在 CAS 中判断 tail->next 不为空,因为别的线程已经给 tail->next 添加了元素,因此需要重新获取 tail 的值
}
else if (p == q)
// 如果发现当前 p 节点不是实际上的尾节点,会先检查它的 next 指针是否指向了自己,
// 因为在出队函数 poll 中,将一个元素出队后会把它的 next 指向自己,所以这一步实际上是判断当前的 p 节点是否已经出队
// 如果 tail 指针发送了改变,就从最新的 tail 开始遍历
// 否则,从 head 开始遍历,因为这时候 tail 可能指向一个不可用(已经从队列中移除)的节点,于是有了下面的逻辑
// 比较原来的 t 和 tail 是否相等,并将 tail 赋给 t
// 如果不相等,则 p 获得了当前 tail 值,否则获取 head 值
p = (t != (t = tail)) ? t : head;
else
// 这里是没有遍历到尾节点的情况
// 如果发现已经进行过一次向后遍历的过程,并且 tail 指针发生了改变,就直接使用 tail 指针
// 如果还没有遍历过,或者虽然遍历但是 tail 指针没有变,就继续遍历
p = (p != t && t != (t = tail)) ? t : q;
}
}

// poll 函数,获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 拿到头节点,并把原头节点置空
if (item != null && p.casItem(item, null)) {
// CAS 成功的话说明已经移除一个节点了,因为会有多个线程同时操作移除 head,所以从 head 至少向后遍历一次时才修改 head 指针
if (p != h) // hop two nodes at a time
// 如果刚删除的节点 p->next 为空,则让 p 做为 head(p 做为傀儡节点)
// 如果刚删除的节点 p->next 不为空,则让 p->next 做为 head
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
// 这里是 p->next 为空,说明队列中无元素,p 现在处在傀儡节点上
// 在返回前,把 p 设置为新的 head 来减少其他线程的遍历开销
updateHead(h, p);
return null;
}
else if (p == q)
// 这里是当前检查的这个节点已经被别的线程从队列中移除了,那就重新开始
continue restartFromHead;
else
// q 是 p->next, 相当于 p = p->next, 相当于从 head 向后遍历
p = q;
}
}
}
}

如上是 ConcurrentLinkedQueue 的基本实现,此实现是无界的,因此要及时出队,防止队列元素过多,内存暴增。实现基本上和C++ boost 库中 lockfree.queue 的逻辑思路一致。

三、简单总结

本小节介绍了 Java 中的线程安全容器的实现,介绍了一个加锁的实现和无锁的实现。两者各有优缺点,希望我们可以对比着学习。