线程间同步
如果多个任务间有相互依赖关系,那么任务或者线程之间需要同步。C++ 标准库中提供了条件变量和 feature 来帮助我们处理这种场景
一、条件变量
库 #include <condition_variable>
中有两个条件变量的实现: std::condition_variable
和 std::condition_variable_any
std::condition_variable
仅限于和 std::mutex
一起工作。std::condition_variable_any
更加灵活,可以组合的条件比较广泛。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| std::mutex mut; std::queue<data_chunk> data_queue; std::condition_variable data_cond;
void data_product() { while (true) { data_chunk const data = prepare_data(); std::lock_guard<std::mutex> lk(mut); data_queue.push(data); data_cond.notify_one(); } }
void data_process() { while (true) { std::unique_lock<std::mutex> lk(mnt); data_cond.wait(lk, []{return !data_queue.empty(); }) data_chunk data = data_queue.front(); data_queue.pop(); lk.unlock(); process(data); if (is_last_chunk(data)) break; } }
|
wait 方法,当收到 notify_one 的调用通知条件变量时,线程从睡眠状态苏醒,获取互斥元上的锁,并检查条件,如果条件满足,就从 wait 返回,互斥元仍然锁定;如果条件不满足,该线程解锁互斥元,并在此陷入睡眠等待。
虚假唤醒,对于虚假唤醒的解释主要有两种:
- 一种是由 notify_all 唤醒之后却得不到需要的数据
- 一种是有的系统会出于某种原因唤醒正在阻塞队列的线程,这时候消费者线程也是得不到需要的数据的(因为不是由生产者线程唤醒)。
两种说法兼而有之,都没有错,更准确的说法应该是第一种。
二、使用条件变量的线程安全队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| template<typename T> class ThreadSafeQueue { private: mutable std::mutex mut; std::queue<T> data_queue; std::condition_variable data_cond;
public: ThreadSafeQueue() {} ThreadSafeQueue(ThreadSafeQueue const& other) { std::lock_guard<std::mutex> lk(other.mut); data_queue = other.data_queu; }
void push(T new_value) { std::lock_guard<std::mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); }
void wait_and_pop(T& value) { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this]{return !data_queue.empty(); }); value = data_queue.front(); data_queue.pop(); }
std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this]{return !data_queue.empty(); }); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; }
bool try_pop(T& value) { std::lock_guard<std::mutex> lk(mut); if (data_queue.empty()) { return false; } value = data_queue.front(); data_queue.pop(); return true; }
std::shared_ptr<T> try_pop() { std::lock_guard<std::mutex> lk(mut); if (data_queue.empty()) { return std::make_shared<T>(); } std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; }
bool empty() const { std::lock_guard<std::mutex> lk(mut); return data_queue.empty(); } };
|
如上,使用条件变量和锁用来同步线程,保证数据写入和读取的安全。