undefined

基于 thread 的多线程开发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class scoped_thread {
public:
template<typename... Arg>
explicit scoped_thread(Arg&&... arg) : thread_(std::forward<Arg>(arg)...) {}
scoped_thread(scoped_thread&& other) noexcept : thread_(std::move(other.thread_)) {}
scoped_thread(const scoped_thread&) = delete;
~scoped_thread() {
if (thread_.joinable()) {
thread_.join();
}
}

private:
std::thread thread_;
};
  1. 我们使用了可变模板和完美转发来构造 thread 对象。
  2. thread 不能拷贝,但可以移动;我们也类似地实现了移动构造函数。
  3. 只有 joinable(已经 join 的、已经 detach 的或者空的线程对象都不满足 joinable)的 thread 才可以对其调用 join 成员函数,否则会引发异常。

future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <chrono>
#include <future>
#include <iostream>
#include <thread>
using namespace std;
int work()
{
// 假装我们计算了很久
this_thread::sleep_for(2s);
return 42;
}

int main()
{
auto fut = async(launch::async, work);
// 干一些其他事
cout << "I am waiting now\n";
cout << "Answer: " << fut.get()
<< '\n';
}

如上,线程之间同步数据。分析:

  1. 调用 async 可以获得一个未来量,launch::async 是运行策略,告诉函数模板 async 应当在新线程里异步目标函数。在一些老版本的GCC 中,不指定运行策略,默认不会起新线程
  2. async 函数模板可以根据参数来推导出返回类型,如上,返回类型为 futue<int>
  3. 在未来量上调用 get 成员函数可以获得其结果。这个结果可以是返回值,也可以是异常,即如果 work 抛出了异常,那 main 里执行 fut.get() 时也会得到同样的异常,需要有相同的异常处理代码才能正常工作
  4. 一个 future 上只能调用一次 get 函数,第二次调用为 未定义行为,通常导致程序奔溃
  5. 这样以来,自然一个 future 是不能直接在多个线程里使用。解决:要么直接拿 future 来移动构造一个 shared_future [12],要么调用 future 的 share 方法来生成一个 shared_future,结果就可以在多个线程里用了——当然,每个 shared_future 上仍然还是只能调用一次 get 函数。

promise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <chrono>
#include <future>
#include <iostream>
#include <thread>
#include <utility>
using namespace std;

class scoped_thread {
// 定义同上,略
};

void work(promise<int> prom)
{
// 假装我们计算了很久
this_thread::sleep_for(2s);
prom.set_value(42);
}

int main()
{
promise<int> prom;
auto fut = prom.get_future();
scoped_thread th{work,
move(prom)};
// 干一些其他事
cout << "I am waiting now\n";
cout << "Answer: " << fut.get()
<< '\n';
}

promise 和 future 在这里成对出现,可以看作是一个一次性管道:有人需要兑现承诺,往 promise 里放东西(set_value);有人就像收期货一样,到时间去 future 里拿(get)即可。我们把 prom 移动给新线程,这样老线程就完全不需要管理它的生命周期里。

注意点:

  • 不需要在一个函数结束的时候才去设置 future 的值
  • 一个 promise 和 future 只能使用一次,既不能重复设置,也不能重复取

代码的执行顺序问题

  • 为了优化的必要,编译器是可以调整代码的执行顺序的。唯一的要求是,程序的“可观测”外部行为是一致的
  • 处理器也会对代码的执行顺序进行调整(所谓的CPU乱序执行),在单处理器的情况下,这种乱序无法被程序观察到;但在多处理器的情况下,在另外一个处理器上运行的另一个线程就可能会察觉到这种不同顺序的后果了

对于双重加锁的单例模式就证明了上述的问题

volatile

volatile 的语义只是防止编译器“优化”掉对内存的读写而已。它的合适用法,目前主要用来读写映射到内存地址上的 I/O 操作

由于 volatile 不能在多处理器的环境下确保多个线程能看到同样顺序的数据变化,在今天的通用应用程序中,不应该看到 volatile 的出现

c++11 的内存模型

内存屏障和获得、释放语义: atomic 的使用

  • 获得是一个对内存的读操作,当前线程的任何后面的读写操作都不允许重排到这个操作的前面去
  • 释放是一个对内存的写操作,当前线程的任何前面的读写操作都不允许重排到这个操作的后面去

对于双重加锁的单例模式的改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 头文件
class singleton {
public:
static singleton* instance();

private:
static mutex lock_;
static atomic<singleton*>
inst_ptr_;
};

// 实现文件
mutex singleton::lock_;
atomic<singleton*>
singleton::inst_ptr_;

singleton* singleton::instance()
{
singleton* ptr = inst_ptr_.load(
memory_order_acquire);
if (ptr == nullptr) {
lock_guard<mutex> guard{lock_};
ptr = inst_ptr_.load(
memory_order_relaxed);
if (ptr == nullptr) {
ptr = new singleton();
inst_ptr_.store(
ptr, memory_order_release);
}
}
return inst_ptr_;
}

atomic 的使用

原子操作有三类:

  • 读:在读取的过程中,读取位置的内容不会发生任何变动。
  • 写:在写入的过程中,其他执行线程不会看到部分写入的结果。
  • 读‐修改‐写:读取内存、修改数值、然后写回内存,整个操作的过程中间不会有其他写入操作插入,其他执行线程不会看到部分写入的结果。

头文件定义了内存序,分别是:

  • memory_order_relaxed:松散内存序,只用来保证对原子对象的操作是原子的
  • memory_order_consume:目前不鼓励使用,我就不说明了
  • memory_order_acquire:获得操作,在读取某原子对象时,当前线程的任何后面的读写操作都不允许重排到这个操作的前面去,并且其他线程在对同一个原子对象释放之前的所有内存写入都在当前线程可见
  • memory_order_release:释放操作,在写入某原子对象时,当前线程的任何前面的读写操作都不允许重排到这个操作的后面去,并且当前线程的所有内存写入都在对同一个原子对象进行获取的其他线程可见
  • memory_order_acq_rel:获得释放操作,一个读‐修改‐写操作同时具有获得语义和释放语义,即它前后的任何读写操作都不允许重排,并且其他线程在对同一个原子对象释放之前的所有内存写入都在当前线程可见,当前线程的所有内存写入都在对同一个原子对象进行获取的其他线程可见
  • memory_order_seq_cst:顺序一致性语义,对于读操作相当于获取,对于写操作相当于释放,对于读‐修改‐写操作相当于获得释放,是所有原子操作的默认内存序(除此之外,顺序一致性还保证了多个原子量的修改在所有线程里观察到的修改顺序都相同;我们目前的讨论暂不涉及多个原子量的修改)

atomic 有下面这些常用的成员函数:

  • 默认构造函数(只支持零初始化)
  • 拷贝构造函数被删除
  • 使用内置对象类型的构造函数(不是原子操作)
  • 可以从内置对象类型赋值到原子对象(相当于 store)
  • 可以从原子对象隐式转换成内置对象(相当于 load)
  • store,写入对象到原子对象里,第二个可选参数是内存序类型
  • load,从原子对象读取内置对象,有个可选参数是内存序类型
  • is_lock_free,判断对原子对象的操作是否无锁(是否可以用处理器的指令直接完成原子操作)
  • exchange,交换操作,第二个可选参数是内存序类型(这是读‐修改‐写操作)
  • compare_exchange_weak 和 compare_exchange_strong,两个比较加交换(CAS)的版本,你可以分别指定成功和失败时的内存序,也可以只指定一个,或使用默认的最安全内存序(这是读‐修改‐写操作)
  • fetch_add 和 fetch_sub,仅对整数和指针内置对象有效,对目标原子对象执行加或减操作,返回其原始值,第二个可选参数是内存序类型(这是读‐修改‐写操作)
  • ++ 和 –(前置和后置),仅对整数和指针内置对象有效,对目标原子对象执行增一或减一,操作使用顺序一致性语义,并注意返回的不是原子对象的引用(这是读‐修改‐写操作)
  • += 和 -=,仅对整数和指针内置对象有效,对目标原子对象执行加或减操作,返回操作之后的数值,操作使用顺序一致性语义,并注意返回的不是原子对象的引用(这是读‐修改‐写操作)