同步机制---锁

我们的锁使用 信号量来实现。信号量包括两个操作

  • 增加操作:将信号量的值加 1,唤醒在此信号量上等待的线程
  • 减少操作:判断信号量是否大于 0。若信号量大于 0,则将信号量减一;若信号量等于 0,当前线程将自己阻塞,以在此信号量上等待

线程的阻塞是线程自己发出的动作,也就是线程自己阻塞自己,并不是被别人阻塞的,阻塞是线程主动的行为。已阻塞的线程是由别人来唤醒的,唤醒是被动的。

注意,线程阻塞时,线程的时间片还没用完,在唤醒之后,线程会继续在剩余时间片内运行,调度器不会为他充满时间片。

一、线程阻塞

线程主动阻塞,等待被唤醒。给此线程设置非就绪状态,让调度器无法再调度他,也就是当前线程不能再被加到就绪队列中。

1
2
3
4
5
6
7
8
9
10
11
12
void thread_block(enum task_status stat) {
// 只有 stat 取如下三种状态才不会被调度
ASSERT(((stat == TASK_BLOCKED) || (stat == TASK_WAITING) || (stat == TASK_HANGING)));
enum intr_status old_status = intr_disable();
struct task_struct* cur_thread = running_thread();
// 置其状态为 stat
cur_thread->status = stat;
// 将当前线程换下处理器,由于不会将此线程加入到就绪队列,所以不会被调度
schedule();
// 待当前线程被解除阻塞后才继续运行下面的 intr_set_status
intr_set_status(old_status);
}

将某线程解除阻塞,也就是唤醒某线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void thread_unblock(struct task_struct* pthread) {
enum intr_status old_status = intr_disable();
ASSERT(((pthread->status == TASK_BLOCKED) || (pthread->status == TASK_WAITING)
|| (pthread->status == TASK_HANGING)));
if (pthread->status != TASK_READY) {
// ASSERT 是调试阶段用的,运行阶段我们用 PANIC 返回错误信息
ASSERT(!elem_find(&thread_ready_list, &pthread->general_tag));
if (elem_find(&thread_ready_list, &pthread->general_tag)) {
PANIC("thread_unlock: blocked thread in ready_list\n");
}
// 放到队列的最前面,让他尽快得到调度
list_push(&thread_ready_list, &pthread->general_tag);
pthread->status = TASK_READY;
}
intr_set_status(old_status);
}

二、锁的实现

1
2
3
4
5
6
struct semaphore {
// 信号量值
uint8_t value;
// 保存在此信号量上阻塞的线程
struct list waiters;
};

信号量的结构体,waiters 用来记录在此信号量上等待(阻塞)的所有线程。

注意:信号量仅仅是一个编程理念,是个程序设计结构,只要具备信号量初值和等待线程这两个必要元素即可。

1
2
3
4
5
6
7
8
9
struct lock {
// 锁的持有者
struct task_struct* holder;
// 用二元信号量实现锁
struct semaphore sem;
// 锁的持有者重复申请锁的次数
// 用于规避重复申请锁的情况
uint32_t holder_repeat_nr;
};

holder 成员表示锁的持有者,即那个线程成功申请了锁。

成员 holder_repeat_nr 用来累积锁的持有者重复申请锁的次数,释放锁的时候会参考此变量的值。原因是一般情况下我们应该在进入临界区之前加锁,但有时候可能加锁之后,再次重复加锁。这样可能会释放两次,造成错误,因此释放锁时根据 holder_repeat_nr 此值来执行具体动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 初始化信号量
void sema_init(struct semaphore *psema, uint8_t value) {
psema->value = value;
list_init(&psema->waiters);
}

// 初始化锁
void lock_init(struct lock *plock) {
plock->holder = NULL;
plock->holder_repeat_nr = 0;
// 信号量初值设置为 1,也即二元信号量
sema_init(&plock->sem, 1);
}

如上是信号量的初始化过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 信号量的 down 操作
void sema_down(struct semaphore *psema) {
// 关中断来保证原子操作
enum intr_status old_status = intr_disable();
// 如果信号量值为 0,表示已经被别人持有
for (; psema->value == 0;) {
ASSERT(!elem_find(&psema->waiters, &running_thread()->general_tag));
// 当前线程不应该已经在信号量的 waiters 队列中
if (elem_find(&psema->waiters, &running_thread()->general_tag)) {
PANIC("sema down: thread blocked has been in waiters_list\n");
}
// 若信号量的值等于 0,则当前线程把自己加入该锁的等待队列,然后阻塞自己
list_append(&psema->waiters, &running_thread()->general_tag);
// 阻塞当前线程,直到被唤醒
thread_block(TASK_BLOCKED);
}
// 若 value 为 1 或被唤醒后,即获取到了锁
psema->value--;
ASSERT(psema->value == 0);
// 恢复之前的中断状态
intr_set_status(old_status);
}

注意,在判断信号量是否为 0 时,使用的是循环的方式。因为锁本身是公共的资源,大家也要通过竞争的方式去获得,因此想要获得锁的线程不止一个,当阻塞的线程被唤醒后,也不一定能获得资源,只是再次获取了去竞争锁的机会,所以判断信号量的值最好使用循环方式,而非 if 方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 信号量的 up 操作
void sema_up(struct semaphore *psema) {
// 关中断,保证原子操作
enum intr_status old_status = intr_disable();
ASSERT(psema->value == 0);
if (!list_empty(&psema->waiters)) {
// 唤醒一个被阻塞的线程
// 所谓的唤醒只不过是将阻塞中的线程加入到阻塞队列,将来可以参与调度
// 而且当前是关中断的状态,所以调度器并不会被触发,所以 psema->value++ 是安全的
struct task_struct* thread_blocked = elem2entry(struct task_struct, general_tag, list_pop(&psema->waiters));
thread_unblock(thread_blocked);
}
psema->value++;
ASSERT(psema->value == 1);
// 恢复之前的中断状态
intr_set_status(old_status);
}

注意,所谓的唤醒并不是指马上就运行,而是重新加入就绪队列,将来可以参与调度,运行是将来的事儿。而且是在关中断的情况下,所以调度器并不会被触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void lock_acquire(struct lock* plock) {
// 注意重复获取锁的情况
if (plock->holder != running_thread()) {
sema_down(&plock->sem);
plock->holder = running_thread();
ASSERT(plock->holder_repeat_nr == 0);
plock->holder_repeat_nr = 1;
} else {
plock->holder_repeat_nr++;
}
}

void lock_release(struct lock* plock) {
ASSERT(plock->holder == running_thread());
if (plock->holder_repeat_nr > 1) {
plock->holder_repeat_nr--;
return;
}
ASSERT(plock->holder_repeat_nr == 1);
// 必须放在 sema_up 操作之前
// 这里是未关中断的状态,因此需要先给结构赋值,再进行 sema_up 操作
// 如果先进行 sema_up 操作,然后被调度器换下 CPU,其他线程给 plock 结构赋值了
// 此线程被唤醒后又更改了 plock 结构,就会导致错误
plock->holder = NULL;
plock->holder_repeat_nr = 0;
sema_up(&plock->sem);
}

最后封装好加锁和解锁函数。临界区是关中断的。