重生之我学操作系统——第28章 锁
一、锁的基本思想
- 通过加锁来保证临界区的原子性
lock_t mutex;
...
lock(&mutex);
// 获取锁,如果已被占用, 则等待,直到获得锁为止
balance = balance + 1;
unlock(&mutex);
// 释放锁
- 如何评价锁的实现
- 有效性:只有一个线程能拿到锁
- 公平性:所有线程都有机会拿到锁
- 不会出现饿死情况
- 性能:时间开销小
锁的实现一般要使用某种特 殊硬件指令,但也可以不用
- 锁的实现:用中断屏蔽指令(例如x86的CLI)
- 只适用于单处理器系统
- 需要在内核态运行(开关中断是特权指令)
- 需要运行所有线程执行特权操作
- 影响对中断的及时响应
void lock() {
DisableInterrupts();
}
void unlock() {
EnableInterrupts();
}
二、自旋锁
- 通过忙等待(Busy Waiting)的方式不断检查锁的状态
- 通过循环(自旋)实现
typedef struct __lock_t {
int flag;
} lock_t;
void init(lock_t *mutex) {
// 0 → lock is available, 1 → held
mutex->flag = 0;
}
void lock(lock_t *mutex) {
while (mutex->flag == 1) // TEST the flag
; // spin-wait (do nothing)
mutex->flag = 1; // now SET it !
}
void unlock(lock_t *mutex) {
mutex->flag = 0;
}
typedef struct __lock_t {
int flag;
} lock_t;
void init(lock_t *mutex) {
// 0 → lock is available, 1 → held
mutex->flag = 0;
}
void lock(lock_t *mutex) {
L0:
if (mutex->flag == 1) // TEST the flag
goto L0; // spin-wait (do nothing)
mutex->flag = 1; // now SET it !
}
void unlock(lock_t *mutex) {
mutex->flag = 0;
}
这个锁有问题吗?
缺乏原子性:对 mutex->flag
的检查和设置不是原子操作。在多线程环境中,两个线程可能同时读取 flag
的值并都认为锁是可用的,从而导致数据竞争和不一致的状态。
TestAndSet指令
- 使用硬件保证指令的原子性
int TestAndSet(int *ptr, int new) {
int old = *ptr; // fetch old value at ptr
*ptr = new; // store ‘new’ into ptr
return old; // return the old value
}
typedef struct __lock_t {
int flag;
} spinlock_t;
void spin_init(spinlock_t *lock) {
// 0 indicates that lock is available,
// 1 that it is held
lock->flag = 0;
}
void spin_lock(spinlock_t *lock) {
while (TestAndSet(&lock->flag, 1) == 1)
; // spin-wait (自旋等待)
}
void spin_unlock(spinlock_t *lock) {
lock->flag = 0;
}
LL/SC指令对(MIPS)
- LL(Load-Linked): 链接的加载
- SC(Store-Conditional):条件式存储
int LoadLinked(int *ptr) {
return *ptr;
}
int StoreConditional(int *ptr, int value) {
if (/* 自从上次LoadLinked这个地址以来*ptr没被更新过 */) {
*ptr = value;
return 1; // success!
} else {
return 0; // failed to update
}
}
void lock(lock_t *lock) {
while (1) {
while (LoadLinked(&lock->flag) == 1)
; // spin until it’s zero
if (StoreConditional(&lock->flag, 1) == 1)
return; // if 成功设为1: all done
// otherwise: 重试
}
}
void unlock(lock_t *lock) {
lock->flag = 0;
}
如何解决公平性问题?
FetchAndAdd指令
- 原子地增加一个内存单元的值,返回旧的值
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
- Ticket lock: 可避免饿死
typedef struct __lock_t {
int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}
void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn)
; // spin
}
void unlock(lock_t *lock) {
FetchAndAdd(&lock->turn);
}
自旋锁的总结
- 优点
- 减少上下文切换(线程不需要进入阻塞状态),实现简单
- 适用场景:多处理器系统、短期临界区
- 缺点
- CPU资源浪费 如何摆脱“自旋”
- 不适用场景:单处理器系统、高并发环境
三、非自旋锁
非自旋锁的实现:主动放弃CPU
typedef struct __lock_t {
int flag;
} lock_t;
void init(lock_t *m) {
m->flag = 0;
}
void lock(lock_t *m) {
while (TestAndSet(&m->flag, 1) == 1)
yield(); // give up the CPU
}
void unlock(lock_t *m) {
m->flag = 0;
}
依然低效: 频繁上下文切换
依然可能饿死
非自旋锁的实现:休眠替代自旋
typedef struct __lock_t {
int flag;
queue_t *q;
} lock_t;
void init(lock_t *m) {
m->flag = 0;
queue_init(m->q);
}
void lock(lock_t *m) {
while (TestAndSet(&m->flag, 1) == 1) {
queue_add(m->q, gettid());
park();
}
}
void unlock(lock_t *m) {
m->flag = 0;
unpark(queue_remove(m->q));
}
问题:可能出现竞态条件。
typedef struct __lock_t {
int flag;
queue_t *q;
spinlock_t guard;
} lock_t;
void init(lock_t *m) {
m->flag = 0;
queue_init(m->q);
spin_init(&m->guard);
}
void lock(lock_t *m) {
spin_lock(&m->guard);
while (TestAndSet(&m->flag, 1) == 1) {
queue_add(m->q, gettid());
park();
}
spin_unlock(&m->guard);
}
void unlock(lock_t *m) {
spin_lock(&m->guard);
m->flag = 0;
unpark(queue_remove(m->q));
spin_unlock(&m->guard);
}
问题:可能死锁
void init(lock_t *m) {
m->flag = 0;
queue_init(m->q);
spin_init(&m->guard);
}
void lock(lock_t *m) {
spin_lock(&m->guard);
while (TestAndSet(&m->flag, 1) == 1) {
queue_add(m->q, gettid());
spin_unlock(&m->guard);
// wakeup/waiting race
// 如果此时unlock()
park(); // This will block until unpark is called
spin_lock(&m->guard);
}
spin_unlock(&m->guard);
}
typedef struct __lock_t {
int flag;
queue_t *q;
spinlock_t guard;
} lock_t;
// wakeup/waiting race condition handling
void unlock(lock_t *m) {
spin_lock(&m->guard);
m->flag = 0;
unpark(queue_remove(m->q));
spin_unlock(&m->guard);
}
问题:如果在park前spin_unlock后时间片耗尽,转而调用其他线程的unlock,原线程会陷入休眠中无法释放。
typedef struct __lock_t {
int flag;
queue_t *q;
spinlock_t guard; // 保护flag和q
} lock_t;
void init(lock_t *m) {
m->flag = 0;
queue_init(m->q);
spin_init(&m->guard);
}
void lock(lock_t *m) {
spin_lock(&m->guard);
while (TestAndSet(&m->flag, 1) == 1) {
queue_add(m->q, gettid());
setpark(); // !
spin_unlock(&m->guard);
park(); // 如果之前执行了unpark, 则park直接返回
spin_lock(&m->guard);
}
spin_unlock(&m->guard);
}
void unlock(lock_t *m) {
spin_lock(&m->guard);
m->flag = 0;
unpark(queue_remove(m->q));
spin_unlock(&m->guard);
}
最终版:
typedef struct __lock_t {
int flag;
queue_t *q;
spinlock_t guard; // 保护flag和q
} lock_t;
void lock_init(lock_t *m) {
m->flag = 0;
queue_init(m->q);
spin_init(&m->guard);
}
void lock(lock_t *m) {
spin_lock(&m->guard);
if (m->flag == 0) {
m->flag = 1; // lock is acquired
spin_unlock(&m->guard);
} else {
queue_add(m->q, gettid());
setpark();
spin_unlock(&m->guard);
park(); // 被唤醒后直接拿到锁
}
}
void unlock(lock_t *m) {
spin_lock(&m->guard);
if (queue_empty(m->q)) {
m->flag = 0; // 解锁; no one wants it
} else {
unpark(queue_remove(m->q));
}
// hold lock (for next thread!)
spin_unlock(&m->guard);
}
非自旋锁的实现:futex
- Linux提供futex(类似于Solaris的park和unpark)
- futex_wait(address, expected)
- 使调用线程休眠
- 如果address的值不等于expected,则立即返回
- futex_wake(address)
- 唤醒队列里等待的一个线程
typedef struct __lock_t {
int flag;
} lock_t;
void init(lock_t *m) {
m->flag = 0;
}
void lock(lock_t *m) {
while (TestAndSet(&m->flag, 1) == 1) {
futex_wait(&m->flag, 1);
}
}
void unlock(lock_t *m) {
m->flag = 0;
futex_wake(&m->flag); // 修正为 m->flag
}
- mutex_lock(int *mutex)
- 利用一个整数记录锁的占用情况(最高位)和等待着个数(其他位)
void mutex_lock(int *mutex) {
int v;
/* Bit 31 was clear, 得到锁 */
if (atomic_bit_test_set(mutex, 31) == 0)
return;
atomic_increment(mutex);
while (1) {
if (atomic_bit_test_set(mutex, 31) == 0) {
atomic_decrement(mutex);
return;
}
/* We have to wait now. Make sure locked */
v = *mutex;
if (v >= 0) continue;
futex_wait(mutex, v);
}
}
void mutex_unlock(int *mutex) {
/* check if there are threads waiting for the mutex */
if (atomic_add_zero(mutex, 0x80000000))
return;
/* There are other threads waiting for this mutex, wake one of them up */
futex_wake(mutex);
}
非自旋锁的总结
- 优点
- 避免忙等待,不会占用CPU资源
- 适用场景:长期临界区,高并发环境
- 缺点
- 上下文切换开销大
- 不适用场景:短期临界区
两阶段锁
- Linux采用的一种锁方案(古老但一直被采用)
- 第一阶段采用自旋一段时间
- 如果第一阶段没有获得锁,则会进入休眠状态
Hybrid是OS设计中的一种重要设计思想。
“锁”小结
- 锁的基本思想
- 评价:有效、公平、性能
- 自旋锁
- TestAndSet、LL/SC、FetchAndAdd
- 非自旋锁
- yield()、park和unpark、futex