线程间同步

线程间同步

如果多个任务间有相互依赖关系,那么任务或者线程之间需要同步。C++ 标准库中提供了条件变量和 feature 来帮助我们处理这种场景

一、条件变量

#include <condition_variable> 中有两个条件变量的实现: std::condition_variablestd::condition_variable_any

  • std::condition_variable 仅限于和 std::mutex 一起工作。std::condition_variable_any 更加灵活,可以组合的条件比较广泛。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;

void data_product() {
while (true) {
data_chunk const data = prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one(); // 通知/唤醒等待的线程(如果有的话)
}
}

void data_process() {
while (true) {
std::unique_lock<std::mutex> lk(mnt);
data_cond.wait(lk, []{return !data_queue.empty(); }) // 传入锁以及等待的条件
data_chunk data = data_queue.front();
data_queue.pop();
lk.unlock();
process(data);
if (is_last_chunk(data)) break;
}
}

wait 方法,当收到 notify_one 的调用通知条件变量时,线程从睡眠状态苏醒,获取互斥元上的锁,并检查条件,如果条件满足,就从 wait 返回,互斥元仍然锁定;如果条件不满足,该线程解锁互斥元,并在此陷入睡眠等待。

虚假唤醒,对于虚假唤醒的解释主要有两种:

  • 一种是由 notify_all 唤醒之后却得不到需要的数据
  • 一种是有的系统会出于某种原因唤醒正在阻塞队列的线程,这时候消费者线程也是得不到需要的数据的(因为不是由生产者线程唤醒)。

两种说法兼而有之,都没有错,更准确的说法应该是第一种。

二、使用条件变量的线程安全队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
template<typename T>
class ThreadSafeQueue {
private:
mutable std::mutex mut; // 可变对象
std::queue<T> data_queue;
std::condition_variable data_cond;

public:
ThreadSafeQueue() {}
ThreadSafeQueue(ThreadSafeQueue const& other) {
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queu;
}

void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty(); });
value = data_queue.front();
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty()) {
return false;
}
value = data_queue.front();
data_queue.pop();
return true;
}

std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty()) {
return std::make_shared<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

如上,使用条件变量和锁用来同步线程,保证数据写入和读取的安全。