对于线程安全的理解

对于线程安全的理解

对于21世纪的人类,已经不满足于单核并行的模式;提高效率,增效降本才是我们向往的。程序设计中选择什么样的数据结构体是解决某个问题的关键。如果多线程使用到的数据结构体要满足并发,则涉及到数据的线程安全问题。

  • 一种解决办法是选择单独的互斥元或外部锁来使数据结构体在某一时间段独占,且释放后没有残留。
  • 另一种就是设计可以多线程同时访问的数据结构体。明显,后者在大部分场景是效率较高的。

查看更多

内存屏障和 ABA 问题

内存屏障和 ABA 问题

一、内存屏障

内存屏障也被称为内存栅栏,都是一个意思。内存屏障是全局操作,在之前内存顺序的松散模型中,编译器或者硬件通常可以自由的进行重新排序。屏障限制了这一自由。

从 C++11 开始,提供了下面两个机制:

  • std::atomic_thread_fence:在线程间进行数据访问的同步
  • std::atomic_signal_fence:线程和信号处理器间的同步
查看更多

对于内存模型的理解

对于内存模型的理解

多核时代,程序员们为了系统运行效率做了很多事情。并发、多线程是其中绕不开的一个话题,有了多线程,随之而来的就是线程之间的同步,临界区的出现,然后就是锁的使用。程序员随之发现锁的开销较大,于是有了缩短临界区话题,尽可能的让临界区变得更小一点。但是总归临界区的缩小是有限度的,也就是有天花板的。因此我们开始探索原子操作,无锁化编程。于是为了功能正常的情况下,还要保证良好的效率,本文探讨原子操作的背后,内存的组织形式,编译器、cpu 的执行顺序,语言为 c++ 语言。

一、内存模型的由来

c++11 标准提出了内存模型,而在 c++11 之前,c++ 本身没有多线程的概念,c++ 使用者使用的是操作系统为我们提供的多线程、原子操作。那时的编译器和处理器认为系统中只有一个执行流。但在多线程之后,编码变难了,开发者编写的代码和最终运行的代码之间往往存在较大的差异,而运行的结果与开发者预期的一致,只是表现而已。

那么产生差异的原因主要来自于如下三个方面:

  • 编译器的优化
查看更多

对于原子操作的理解

对于原子操作的理解

一、概念理解

原子操作:是一个不可分割的操作,从系统中的任何一个线程中,你都无法观察到完成了一半的这种操作,它要么做完了,要么没有做完。如果读取对象值的载入操作是原子的,并且所有对该对象的修改也都是原子的,那么这个载入操作所获得得要么是对象的初始值,要么是被修改者修改后的值。

CAS 的意思:是 Compare & Set 或者 Compare & Swap。整个过程是原子的。现代几乎所有的CPU指令都支持 CAS 的原子操作,X86 下对应的是 CMPXCHG 汇编指令。

在 c++ 的 atomic 中关于 CAS 的实现有两种:

1
2
3
4
template< class T >
bool compare_exchange_weak (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept;
template< class T >
bool compare_exchange_strong (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept;

查看更多

研读 C++ 语言 boost 库中无锁队列的实现

研读 C++ 语言 boost 库中无锁队列的实现

lockfree.queue(c++ boost)实现。

C++语言本身没有提供线程安全的容器,而高质量的 boost 库中有实现线程安全队列,而且还是无锁的实现,以下代码基于 boost 库 1.78.0 版本。

一、lockfree.queue 的实现

我只保留了主要的代码逻辑,方便理解代码含义。

queue 采用链表为底层实现方式,包括头节点 head 和尾节点 tail。通过预先分配一个不存储数据的傀儡节点,可以少掉很多边界条件的判断,保证队列中总是至少会有一个节点,将在头尾的两个节点访问分开。对于一个空队列,head 和 tail 都指向这个傀儡节点,而不是 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class queue {
// 队列中每个节点的结构定义
struct BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES) node
{
node(T const & v, handle_type null_handle) : data(v)
{
// 增加标签来避免 ABA 问题
// 这里先只需要清楚是给链表节点的 next 指针赋值
tagged_node_handle old_next = next.load(memory_order_relaxed);
tagged_node_handle new_next (null_handle, old_next.get_next_tag());
next.store(new_next, memory_order_release);
}
// node 节点的数据部分就是常见的 next 指针和业务数据
atomic<tagged_node_handle> next;
T data;
};

// queue 类中数据定义
private:
// head 为队列头节点
atomic<tagged_node_handle> head_;
// tail 为队列尾节点
atomic<tagged_node_handle> tail_;
// pool 为节点池,存储 node 节点
pool_t pool;

public:
// queue 的构造函数,主要是使用默认的内存分配器初始化一定大小的 pool,然后调用 initialize 初始化 head 和 tail 节点
explicit queue(size_type n):
head_(tagged_node_handle(0, 0)),
tail_(tagged_node_handle(0, 0)),
pool(node_allocator(), n + 1)
{
initialize();
}

// 初始化 head 和 tail 节点
void initialize(void)
{
// 从 pool 中取出一个 node 节点做为傀儡节点,让 head 和 tail 都指向它
node * n = pool.template construct<true, false>(pool.null_handle());
tagged_node_handle dummy_node(pool.get_handle(n), 0);
head_.store(dummy_node, memory_order_relaxed);
tail_.store(dummy_node, memory_order_release);
}

// 线程安全的 push 底层调用函数,尾插的方式
template <bool Bounded>
bool do_push(T const & t)
{
// 从 pool 中取出一个 node 节点,将业务数据以引用的方式填充到 node 中
// 这个 node_handle 就是我们希望插入链表的节点
node * n = pool.template construct<true, Bounded>(t, pool.null_handle());
handle_type node_handle = pool.get_handle(n);

if (n == NULL)
return false;
// 以循环的方式,如果没有插入节点成功,则重试
for (;;) {
// 安全的获取到 tail 指针指向的 node 节点
tagged_node_handle tail = tail_.load(memory_order_acquire);
node * tail_node = pool.get_pointer(tail);
// 安全的获取到 tail->next 指针指向的 node 节点
tagged_node_handle next = tail_node->next.load(memory_order_acquire);
node * next_ptr = pool.get_pointer(next);

// 在当我们获取 tail->next 时其他线程已经修改了 tail 的指向。做二次确认,并准备重新取 tail 指针
tagged_node_handle tail2 = tail_.load(memory_order_acquire);
if (BOOST_LIKELY(tail == tail2)) {
// tail->next 为空,代表可以插入 node 节点
if (next_ptr == 0) {
// 给 next 节点打上标签
tagged_node_handle new_tail_next(node_handle, next.get_next_tag());
// 如果 tail_node->next 等于 next 时,就把 new_tail_next 赋给 tail_node->next, 返回 true
// 如果 tail_node->next 不等于 next 时,说明有其他线程已经修改了 tail_node->next, 则返回 false 重新来
// compare_exchange_weak:可能返回虚假失败,且虚假失败时不会修改函数参数 expected 值。性能相对高
// compare_exchange_strong:不会出现虚假失败,性能相对低
// 这里概率性的会出现真的失败从而继续循环,选择 compare_exchange_weak 即使虚假失败,也无伤大雅。而且大大提高了性能
if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) {
// 给 tail 节点打上标签
tagged_node_handle new_tail(node_handle, tail.get_next_tag());
// 最后更新 tail 的指向,并返回 true

// 这里为什么不判断交换失败的情况??
// 因为假设线程 T1,上面 if 条件中 tail_node->next.compare_exchange_weak(next, new_tail_next) 执行成功的话
// tail_node->next = new_tail_next 操作成功,那么其他所有随后线程的 compare_exchange_weak 都会失败,然后就会再循环
// 此时,如果 T1 线程还没有更新 tail 指针,其他的线程就会继续失败,因为 tail_node->next 不为空了
// 直到 T1 线程更新完 tail 指针,其他的线程才可以拿到新的 tail 指针,继续操作。
// 也就意味着只要 tail_node->next 赋值成功,此线程对 tail 指针就时独占了,tail 指针必然可以被更新,不用担心失败

// 这也引入了一个新的问题,要是某个线程在这里停掉或者挂掉,那岂不是其他线程进入了死循环?
// 因此请看下面的 else 部分代码,如果 tail_node->next 不为空,则更新 tail 指针,使 tail 指针指向下一个。
// 而且更新 tail 指针相当于对所有的线程都更新了 tail 指针
tail_.compare_exchange_strong(tail, new_tail);
return true;
}
}
else {
// 如果 tail->next 不为空,则说明其他线程已经添加了节点到 tail->next 上, 因此更新 tail 的指针指向下一个
// 在更新前给 tail 指针打上标签
tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag());
tail_.compare_exchange_strong(tail, new_tail);
}
}
}
}

// 线程安全的 pop 底层调用函数,从头节点取出
template <typename U>
bool pop (U & ret)
{
// 如果队列为空,则 head 和 tail 指向同一个节点
for (;;) {
// 取出 head 指针
tagged_node_handle head = head_.load(memory_order_acquire);
node * head_ptr = pool.get_pointer(head);
// 取出 tail 指针
tagged_node_handle tail = tail_.load(memory_order_acquire);
// 取出 head->next 指针和其 node 节点
tagged_node_handle next = head_ptr->next.load(memory_order_acquire);
node * next_ptr = pool.get_pointer(next);

// 如果 head 指针已经移动了,则重新取 head 指针
tagged_node_handle head2 = head_.load(memory_order_acquire);
if (BOOST_LIKELY(head == head2)) {
// 如果 head 指针和 tail 指针指在同一个节点,且 head-next 为空,说明队列此时是空的

// head 指针和 tail 指针指向同一个节点,已经说明了队列为空了,为什么还要加上 head->next(next_ptr) 为空这个条件呢?
// 因为当 push 操作做了一半,做到了将新 node 节点插到 tail->next 上时,还没有更新 tail 指针。
// 此时队列中是有元素的,而且 head 和 tail 指针的指向相同
if (pool.get_handle(head) == pool.get_handle(tail)) {
// 到这里,队列中一定没有元素,返回 false
if (next_ptr == 0)
return false;
// 如果是上述情况 head == tail,但队列中有元素,则说明 tail 指针落后了,更新 tail 指针且更新前加标签
tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
tail_.compare_exchange_strong(tail, new_tail);

} else {
// else 分支说明 head 和 tail 没有指向同一个 node 节点,为什么还要判断 head->next 是否为空呢?
// 由于 node 节点底层的内存空间是复用的,在分配节点时清除了 next_ptr,因此可能会看到一个空指针
// 当然这个判断与具体的实现有关
if (next_ptr == 0)
continue;
// 将 head->next 指向的 node 节点的数据拷贝给 ret
detail::copy_payload(next_ptr->data, ret);

tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
// 更新 head 指针指向 head->next
// 这里什么情况会失败呢?就是别的线程已经更新了 head 指针,并且拿走了 head 指针所指向的数据
// 因此当前线程就需要重新循环,重新获取 head 指针,直到更新成功为止
if (head_.compare_exchange_weak(head, new_head)) {
// 清理原来老的 dummy 节点
pool.template destruct<true>(head);
return true;
}
}
}
}
}
};

查看更多

研读 Java 语言中线程安全队列的实现

研读 Java 语言中线程安全队列的实现

BlockingQueue 和 ConcurrentLinkedQueue (java)实现

Java提供的线程安全的 Queue 可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue。以下的代码基于 openjdk version "11.0.8"

BlockingQueue 是一个接口,具体的实现有很多,如 ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockQueue、SynchronousQueue。就拿 ArrayBlockingQueue 来看吧

一、ArrayBlockingQueue 实现

ArrayBlockingQueue 提供了很多 API 供我们使用,在插入数据和获取数据我只选择了两个具有代表性的 API 进行分析,其余相似的 API 都是换汤不换药,基本的逻辑思想都是一致的。下面代码只罗列出比较重要的部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 底层用于存储数据节点的数组
final Object[] items;
// 移除元素现在所处的位置索引
int takeIndex;
// 插入元素现在所处的位置索引
int putIndex;
// 队列中元素的个数
int count;
// 可重入的独占锁
final ReentrantLock lock;
// 为了移除元素的等待条件
private final Condition notEmpty;
// 为了插入元素的等待条件
private final Condition notFull;

// 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
// 首先加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 当队列元素个数已满的时候,等待一定时间再去判断,如果还是满的就退出
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
// 队列有空间,则插入元素
enqueue(e);
return true;
} finally {
// 最后解锁
lock.unlock();
}
}
// 向队列尾插入元素
private void enqueue(E x) {
final Object[] items = this.items;
// 在 putIndex 位置添加数据
items[putIndex] = x;
// 因为队列底层使用环形数组,所以当达到数组长度时,putIndex 重头再来
if (++putIndex == items.length)
putIndex = 0;
// 元素个数自增
count++;
// 添加完数据后,说明数组中有数据了,所以可以唤醒 notEmpty 条件对象等待队列(链表)中第一个可用线程去获取数据
notEmpty.signal();
}

// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
// 加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列为空,则等待一定时间再判断
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
// 取出数据
return dequeue();
} finally {
lock.unlock();
}
}
// 从队列头取出数据
private E dequeue() {
final Object[] items = this.items;
// 提取 takeIndex 位置上的数据
E x = (E) items[takeIndex];
// 清空 takeIndex 位置上的数据
items[takeIndex] = null;
// 环形数组
if (++takeIndex == items.length)
takeIndex = 0;
// 数组中元素个数减一
count--;
// 提取完数据后,说明数组中有空位,所以可以唤醒 notFull 条件对象的等待队列(链表)中的第一个可用线程去写数据
notFull.signal();
return x;
}
}

查看更多

思考与展望

思考与展望

一、无锁队列一定优于有锁队列吗?

不一定,对于 CAS 实现的硬件级的互斥,其单次操作性能比相同条件下的应用层的较为高效,但当多个线程并发时,硬件级的互斥引入的代价与应用层的锁争用同样令人惋惜。因此如果纯粹希望通过使用 CAS 无锁算法及相关数据结构而带来程序性能的大量提升是不可能的,硬件级原子操作使应用层操作变慢,而且无法再度优化。相反通过对有锁多线程程序的良好设计,可以使程序性能没有任何下降,可以实现高度的并发性。

但是我们也要看到应用层无锁的好处,比如不需要程序员再去考虑死锁、优先级反转等棘手的问题,因此在对应用程序不太复杂,而对性能要求稍高时,可以采用有锁多线程。而程序较为复杂,性能要求满足使用的情况下,可以使用应用级无锁算法。

二、推荐读物

查看更多

实现一个无锁的线程安全队列

实现一个无锁的线程安全队列

我们前面铺垫了很多基础知识,并且研读了优秀的源码。接下来我们要亲手实现一个无锁的线程安全队列,来真正的体会了解下无锁化编程。

我们首先说明我们的实现方式:

  • 使用数组作为数据的存储格式,也即我们的队列的底层是以数组进行存储
  • 规定队列的容量,防止出现无容量上限的队列大量占用内存的情况。
查看更多

研读 Go 语言 channel 的实现

研读 Go 语言 channel 的实现

首先明确go语言的设计模块:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。这样在我看来让 go 语言代码更加整洁。因此 go 语言中 Goroutine 之间会通过 Channel 传递数据。基于go 1.15 版本,Channel 的实现。

一、 Channel 底层数据结构

chan 的底层数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
type hchan struct {
qcount uint // 元素个数
dataqsiz uint // 环形队列的长度
buf unsafe.Pointer // 指向环形队列的指针
elemsize uint16 // 环形队列中每个元素的大小
closed uint32 // chan 是否被关闭
elemtype *_type // 环形队列中元素的类型
sendx uint // 环形队列中发送操作处理到的位置
recvx uint // 环形队列中接收操作处理到的位置
recvq waitq // 处于阻塞状态的接收 Goroutine 双向链表
sendq waitq // 处于阻塞状态的发送 Goroutine 双向链表
lock mutex // 互斥锁
}

chan 使用 make 关键字创建,可以带缓冲区的异步 Channel 和不带缓冲区的同步 Channel。这里对创建过程不做赘述。基本上分为三种情况:

查看更多