研读 C++ 语言 boost 库中无锁队列的实现

研读 C++ 语言 boost 库中无锁队列的实现

lockfree.queue(c++ boost)实现。

C++语言本身没有提供线程安全的容器,而高质量的 boost 库中有实现线程安全队列,而且还是无锁的实现,以下代码基于 boost 库 1.78.0 版本。

一、lockfree.queue 的实现

我只保留了主要的代码逻辑,方便理解代码含义。

queue 采用链表为底层实现方式,包括头节点 head 和尾节点 tail。通过预先分配一个不存储数据的傀儡节点,可以少掉很多边界条件的判断,保证队列中总是至少会有一个节点,将在头尾的两个节点访问分开。对于一个空队列,head 和 tail 都指向这个傀儡节点,而不是 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class queue {
// 队列中每个节点的结构定义
struct BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES) node
{
node(T const & v, handle_type null_handle) : data(v)
{
// 增加标签来避免 ABA 问题
// 这里先只需要清楚是给链表节点的 next 指针赋值
tagged_node_handle old_next = next.load(memory_order_relaxed);
tagged_node_handle new_next (null_handle, old_next.get_next_tag());
next.store(new_next, memory_order_release);
}
// node 节点的数据部分就是常见的 next 指针和业务数据
atomic<tagged_node_handle> next;
T data;
};

// queue 类中数据定义
private:
// head 为队列头节点
atomic<tagged_node_handle> head_;
// tail 为队列尾节点
atomic<tagged_node_handle> tail_;
// pool 为节点池,存储 node 节点
pool_t pool;

public:
// queue 的构造函数,主要是使用默认的内存分配器初始化一定大小的 pool,然后调用 initialize 初始化 head 和 tail 节点
explicit queue(size_type n):
head_(tagged_node_handle(0, 0)),
tail_(tagged_node_handle(0, 0)),
pool(node_allocator(), n + 1)
{
initialize();
}

// 初始化 head 和 tail 节点
void initialize(void)
{
// 从 pool 中取出一个 node 节点做为傀儡节点,让 head 和 tail 都指向它
node * n = pool.template construct<true, false>(pool.null_handle());
tagged_node_handle dummy_node(pool.get_handle(n), 0);
head_.store(dummy_node, memory_order_relaxed);
tail_.store(dummy_node, memory_order_release);
}

// 线程安全的 push 底层调用函数,尾插的方式
template <bool Bounded>
bool do_push(T const & t)
{
// 从 pool 中取出一个 node 节点,将业务数据以引用的方式填充到 node 中
// 这个 node_handle 就是我们希望插入链表的节点
node * n = pool.template construct<true, Bounded>(t, pool.null_handle());
handle_type node_handle = pool.get_handle(n);

if (n == NULL)
return false;
// 以循环的方式,如果没有插入节点成功,则重试
for (;;) {
// 安全的获取到 tail 指针指向的 node 节点
tagged_node_handle tail = tail_.load(memory_order_acquire);
node * tail_node = pool.get_pointer(tail);
// 安全的获取到 tail->next 指针指向的 node 节点
tagged_node_handle next = tail_node->next.load(memory_order_acquire);
node * next_ptr = pool.get_pointer(next);

// 在当我们获取 tail->next 时其他线程已经修改了 tail 的指向。做二次确认,并准备重新取 tail 指针
tagged_node_handle tail2 = tail_.load(memory_order_acquire);
if (BOOST_LIKELY(tail == tail2)) {
// tail->next 为空,代表可以插入 node 节点
if (next_ptr == 0) {
// 给 next 节点打上标签
tagged_node_handle new_tail_next(node_handle, next.get_next_tag());
// 如果 tail_node->next 等于 next 时,就把 new_tail_next 赋给 tail_node->next, 返回 true
// 如果 tail_node->next 不等于 next 时,说明有其他线程已经修改了 tail_node->next, 则返回 false 重新来
// compare_exchange_weak:可能返回虚假失败,且虚假失败时不会修改函数参数 expected 值。性能相对高
// compare_exchange_strong:不会出现虚假失败,性能相对低
// 这里概率性的会出现真的失败从而继续循环,选择 compare_exchange_weak 即使虚假失败,也无伤大雅。而且大大提高了性能
if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) {
// 给 tail 节点打上标签
tagged_node_handle new_tail(node_handle, tail.get_next_tag());
// 最后更新 tail 的指向,并返回 true

// 这里为什么不判断交换失败的情况??
// 因为假设线程 T1,上面 if 条件中 tail_node->next.compare_exchange_weak(next, new_tail_next) 执行成功的话
// tail_node->next = new_tail_next 操作成功,那么其他所有随后线程的 compare_exchange_weak 都会失败,然后就会再循环
// 此时,如果 T1 线程还没有更新 tail 指针,其他的线程就会继续失败,因为 tail_node->next 不为空了
// 直到 T1 线程更新完 tail 指针,其他的线程才可以拿到新的 tail 指针,继续操作。
// 也就意味着只要 tail_node->next 赋值成功,此线程对 tail 指针就时独占了,tail 指针必然可以被更新,不用担心失败

// 这也引入了一个新的问题,要是某个线程在这里停掉或者挂掉,那岂不是其他线程进入了死循环?
// 因此请看下面的 else 部分代码,如果 tail_node->next 不为空,则更新 tail 指针,使 tail 指针指向下一个。
// 而且更新 tail 指针相当于对所有的线程都更新了 tail 指针
tail_.compare_exchange_strong(tail, new_tail);
return true;
}
}
else {
// 如果 tail->next 不为空,则说明其他线程已经添加了节点到 tail->next 上, 因此更新 tail 的指针指向下一个
// 在更新前给 tail 指针打上标签
tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag());
tail_.compare_exchange_strong(tail, new_tail);
}
}
}
}

// 线程安全的 pop 底层调用函数,从头节点取出
template <typename U>
bool pop (U & ret)
{
// 如果队列为空,则 head 和 tail 指向同一个节点
for (;;) {
// 取出 head 指针
tagged_node_handle head = head_.load(memory_order_acquire);
node * head_ptr = pool.get_pointer(head);
// 取出 tail 指针
tagged_node_handle tail = tail_.load(memory_order_acquire);
// 取出 head->next 指针和其 node 节点
tagged_node_handle next = head_ptr->next.load(memory_order_acquire);
node * next_ptr = pool.get_pointer(next);

// 如果 head 指针已经移动了,则重新取 head 指针
tagged_node_handle head2 = head_.load(memory_order_acquire);
if (BOOST_LIKELY(head == head2)) {
// 如果 head 指针和 tail 指针指在同一个节点,且 head-next 为空,说明队列此时是空的

// head 指针和 tail 指针指向同一个节点,已经说明了队列为空了,为什么还要加上 head->next(next_ptr) 为空这个条件呢?
// 因为当 push 操作做了一半,做到了将新 node 节点插到 tail->next 上时,还没有更新 tail 指针。
// 此时队列中是有元素的,而且 head 和 tail 指针的指向相同
if (pool.get_handle(head) == pool.get_handle(tail)) {
// 到这里,队列中一定没有元素,返回 false
if (next_ptr == 0)
return false;
// 如果是上述情况 head == tail,但队列中有元素,则说明 tail 指针落后了,更新 tail 指针且更新前加标签
tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
tail_.compare_exchange_strong(tail, new_tail);

} else {
// else 分支说明 head 和 tail 没有指向同一个 node 节点,为什么还要判断 head->next 是否为空呢?
// 由于 node 节点底层的内存空间是复用的,在分配节点时清除了 next_ptr,因此可能会看到一个空指针
// 当然这个判断与具体的实现有关
if (next_ptr == 0)
continue;
// 将 head->next 指向的 node 节点的数据拷贝给 ret
detail::copy_payload(next_ptr->data, ret);

tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
// 更新 head 指针指向 head->next
// 这里什么情况会失败呢?就是别的线程已经更新了 head 指针,并且拿走了 head 指针所指向的数据
// 因此当前线程就需要重新循环,重新获取 head 指针,直到更新成功为止
if (head_.compare_exchange_weak(head, new_head)) {
// 清理原来老的 dummy 节点
pool.template destruct<true>(head);
return true;
}
}
}
}
}
};

如上是 boost 库中关于无锁的线程安全队列的实现。其中涉及到的 ABA 问题以及 ABA 问题如何解决,还有 atomic 的内存顺序。而 atomic 的 compare_exchange_weak 和 compare_exchange_strong 就是 CAS 技术。由于上述注释已经很明确了,所以我就不在赘述具体的代码逻辑了。

二、 lockfree.queue 总结

boost 库实现的无锁的线程安全队列和如下两篇论文的思想是一致的。

业界大部分的无锁的 queue 大都是同样的思路,不过需要关注不同语言的不同特性,比如 gc 等等。