实现一个无锁的线程安全队列
我们前面铺垫了很多基础知识,并且研读了优秀的源码。接下来我们要亲手实现一个无锁的线程安全队列,来真正的体会了解下无锁化编程。
我们首先说明我们的实现方式:
- 使用数组作为数据的存储格式,也即我们的队列的底层是以数组进行存储
- 规定队列的容量,防止出现无容量上限的队列大量占用内存的情况。
- 采用环形队列的方式,即在进行写操作时,队列满了之后,从数组 0 位置继续覆盖插入
- 提供的读、写方法返回值都为 bool 类型。将更多的逻辑处理权释放给 API 使用者
- 采用 C++11 编写
一、类的初始化
1 | struct Node { |
上面是我们的类中的所有成员变量。
- 结构体 Node 存储用户的数据,其中 data 是具体用户的数据。
queue_
是一个指针,存储的是一个以 Node 为元素的数组capacity_
是queue_
数组的长度capacity_mask_
是capacity_ - 1
,作为掩码值,下文会介绍head_
代表 pop 的位置。即从head_
位置进行 poptail_
代表 push 的位置,即为tail_
位置进行 push
接下来介绍构造函数:
1 | explicit LockFreeQueue(size_t capacity) { |
1. 数组容量初始化
在很多情况下,尤其是和缓存等数据结构相关的情况下,使用长度为 2 的幂的数组大小可以大大提高性能。具体来说,对于长度为 2 的幂的数组,可以使用位运算代替模运算来计算数组索引,这要比使用模运算更高效。此外,多说一点,如果是哈希表,长度为 2 的幂的数组大小可以减少哈希冲突的数量,从而提高哈希表的性能。因此我们选择 2 的幂的大小作为数组的大小。
用户传入的数组容量大小作为一个参考值,我们会通过此代码段将用户给的容量向上舍入到下一个最接近的 2 的幂。这是通过将给定容量减 1,然后在一个循环中设置一个掩码,该掩码将 “最高设置位” 之后的所有位都设置为 1,来实现的。最后,将掩码加 1 就可以获得下一个最接近的 2 的幂。
1 | capacity_mask_ = capacity_ - 1; |
因此,capacity_
的值将被设置为大于或等于用户给定容量的最小 2 的幂。比如用户给定容量为 11,则我们的 capacity_
将被设置为 16 作为最终容量,而 capacity_mask_
为 15 作为掩码值,这个掩码值下文会用到。
2. 其他成员初始化
接下来就来初始化其他成员。我们首先初始化数组,给数组申请内存。然后给数组中所有的 Node 节点设置初始值。
节点的 tail 值设置为当前节点,节点的 head 值设置为 -1。
类的 tail_ 值和 head_ 值都设置为 0。代表起初 push 和 pop 的位置都在数组的 0 位置。
二、写操作
接下来我们来看看如何进行写操作,先展示代码
1 | bool push_back(const T& data) { |
首先我们获取到要 push 的位置,也就是 tail_
值。此时对于 tail_
的读取我们可以使用 memory_order_relaxed
内存松散模型,只要保证获取 tail_
值本身是原子的即可。
接下来 tail & capacity_mask_
这句代码是什么意思呢?我们上面提到 capacity_mask_ = capacity_ - 1
。并且 capacity_
是 2 的幂。也就是说 capacity_
的二进制数据中只有 1 位是 1,其他位都为 0,而且为 1 的 这一位是当前数字的最高位。对应的 capacity_mask_
的二进制数据中当前数字的最高位是 0,其他位都是 1。比如:
1 | capacity_ = 8; // 0000 1000 |
好,我们在来看 tail & capacity_mask_
这句代码的意思。简单来说,tail 值小于 capacity_
时,tail 值不变;tail 值等于 capacity_
时,tail 值为 0。逻辑比较简单,所以这行代码是为了实现循环,即 [0, capacity_)
这个左闭右开区间中数字循环获取。也就实现了循环队列的效果。
然后,我们获取到可以 push 的位置的 Node 节点。在循环内部我们先判断 node 节点的 tail 值是不是我们之前获取到的 tail_ 的位置,也就是判断当前获取到 node 节点是不是应该插入的位置,有可能在我们获取 node 的过程中,有其他线程插入了 node 节点,导致 tail_ 的移动。如果判断到获取的 node 节点不是应该插入的位置,直接返回 false,插入失败。将选择权交给用户。
如果判断当前获取的 node 节点就是应该插入的位置,我们再原子的进行修改 tail_
的值。修改成功则退出循环;如果修改失败说明其他线程又抢先一步插入了节点,导致了 tail_
的值的变化,那么我们继续循环。
我们前面小节提到 compare_exchange_weak 的虚假失败。因为虚假失败通常很少发生,而且 compare_exchange_weak 的性能往往比 compare_exchange_strong 要快很多。即使发生了虚假失败,我们再来重新获取插入节点也未尝不可。
当从循环中退出时,我们终于得到了一个 node 节点,这个节点就是我们应该插入的位置。而且此时没有竞争,其他线程都忙着去竞争 tail_
去了,我们直接给这个 node 节点赋值即可。
new (&node->data)T(data);
这行代码是 C++ 中的 placement new
操作。
简单说一下,placement new
是 operator new
的一个重载版本,允许我们在一个已经分配好的内存中构造一个新的对象。我们要知道 new 操作符分配内存需要在堆中查找足够大的剩余空间,这个操作是很慢的,而且还有可能出现无法分配内存的异常(空间不够)。placement new就可以解决这个问题。我们构造对象都是在一个预先准备好了的内存缓冲区中进行,不需要查找内存,内存分配的时间是常数;而且不会出现在程序运行中途出现内存不足的异常。所以,placement new非常适合那些对时间要求比较高,长时间运行不希望被打断的应用程序。
同时,我们在初始化的时候已经申请好内存空间,在高并发的入队、出队时,直接使用 placement new
可以有更快的速度,增加更高的并发量。
最后,node->head.store(tail, std::memory_order_release);
这行代码将 node 节点的 head 值设置为之前获取到的 tail 值。这里需要 memory_order_release
内存模型,保证先将用户的 data 数据放在 node 节点后,再设置 node 节点的 head 值。这个顺序不能反了,因为当我们在 pop 的时候,是通过 node 节点的 head 值作为判断条件的,一旦设置了 node 节点的 head 值,就说明此时此 node 值可以读了。但是很明显,这个时候,用户的 data 值还没有赋给 node,可能会造成预料之外的问题。
三、读操作
接下来我们来看如何进行读操作。如下代码
1 | bool pop_front(T& result) { |
读操作首先是获取 head_
数值,也就是要 pop 的位置。这里可以使用 memory_order_relaxed
内存顺序,只需要保证 head_
本身的读操作是原子的即可。
然后从队列中获取 pop 位置的 node 节点。head & capacity_mask_
是为了实现环形队列,即当 head 走到 capacity_
时,head & capacity_mask_
返回 0。
获取到节点 node 后,先判断和 head_
是否相等,有可能在中间过程中,其他线程已经获取到了节点,并且更新了 head_
值。如果不相等,直接返回 false,pop 失败,交给用户处理。
这里注意,我们在初始化的时候,设置的所有 node 节点的 head 值为 -1,所以当队列中没有元素的时候,会直接返回 false。
接下来进行设置 head_
位置的值。使用 compare_exchange_weak
CAS 操作,原子的给 head_
赋上 head+1
的值,当然前提条件是 head_
值没有被其他线程修改。如果成功,退出循环,我们开始使用这个 node 节点。如果不成功,说明其他线程已经修改了 head_
的值了,我们继续循环上面的步骤。出现虚假唤醒也不要紧,继续循环上面的步骤一次也是可以的。而且 compare_exchange_weak 的性能往往比 compare_exchange_strong 要快很多。
退出循环后,我们将这个节点的数据通过参数 result 传出去,这个 result 是引用类型。并且调用用户数据的析构函数。因为我们在 push 的时候使用 placement new
调用了构造函数,此时调用析构函数将这个位置的内存进行重置。注意只是销毁了对象,这块内存空间不会被释放,其他对象还可以在这块内存空间上进行 placement new
操作。对于 placement new
操作不熟的可以去了解下。
最后,我们设置节点的 tail 值,设置为一个非法值即可。并且需要 memory_order_release
内存顺序。保证最后一步在设置节点 node 的 tail 值。因为设置了节点 node 的 tail 值,证明这个节点是可以用了,可以被 push 进数据,但是很明显,需要将当前节点 node 的数据赋给 result,并且重置内存空间后,这个节点才可用。
四、析构操作
1 | ~LockFreeQueue() { |
析构操作中,将有数据的节点 node 全部进行析构。析构后释放 queue_
这块内存即可
五、ABA 问题
我们再次来看看 ABA 问题,ABA 问题最容易发生在 lock free 算法中,CAS 可能最明显,如果 CAS 操作的对象是指针。因为 CAS 判断的是指针的值。那么很明显,指针的值可以不变,但是指针指向的内容却可以变化。
假定 CAS 操作的对象是指针,如果 pop 的时候,在做 CAS 操作之前,如果这个指针指向的那块内存又被回收了,并且被重用了,而重用的内容又被 push 进来。这时候,表面上看,这个节点没有一点问题,可以顺利通过 CAS 操作,但是指针指向的内容是有问题的。
所幸,我们的实现中,使用的 CAS 操作都是整形,没有涉及到指针类型,因此不存在 ABA 问题。
但是这里还是多说两句,如果遇到了 CAS 操作的对象是指针怎么办?使用节点内存引用计数。可以参考:https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.53.8674&rep=rep1&type=pdf
六、简单总结
本小节实现了无锁的线程安全队列,是以前所有小节的一个总结,大家一定要自己动手实现,有不懂的地方及时攻克。
由于本人才疏学浅,代码中如果有问题,烦请指出,如果有优化点,也可以与我沟通交流。
最后我们贴出所有这个无锁的线程安全队列类的整体实现。
七、整体实现
1 | namespace lock_free { |