一、锁的基本思想

  • 通过加锁来保证临界区的原子性
lock_t mutex;
...
lock(&mutex);	
// 获取锁,如果已被占用, 则等待,直到获得锁为止
balance = balance + 1;
unlock(&mutex);
// 释放锁
  • 如何评价锁的实现
    • 有效性:只有一个线程能拿到锁
    • 公平性:所有线程都有机会拿到锁
      • 不会出现饿死情况
    • 性能:时间开销小

锁的实现一般要使用某种特 殊硬件指令,但也可以不用

  • 锁的实现:用中断屏蔽指令(例如x86的CLI)
    • 只适用于单处理器系统
    • 需要在内核态运行(开关中断是特权指令)
      • 需要运行所有线程执行特权操作
    • 影响对中断的及时响应
void lock() {
    DisableInterrupts();
}
void unlock() {
    EnableInterrupts();
}

二、自旋锁

  • 通过忙等待(Busy Waiting)的方式不断检查锁的状态
    • 通过循环(自旋)实现
typedef struct __lock_t { 
    int flag; 
} lock_t;

void init(lock_t *mutex) {
    // 0 → lock is available, 1 → held
    mutex->flag = 0;
}

void lock(lock_t *mutex) {
    while (mutex->flag == 1)  // TEST the flag
        ;  // spin-wait (do nothing)
    mutex->flag = 1;  // now SET it !
}

void unlock(lock_t *mutex) {
    mutex->flag = 0;
}
typedef struct __lock_t { 
    int flag; 
} lock_t;

void init(lock_t *mutex) {
    // 0 → lock is available, 1 → held
    mutex->flag = 0;
}

void lock(lock_t *mutex) {
L0: 
    if (mutex->flag == 1)  // TEST the flag
        goto L0;  // spin-wait (do nothing)
    mutex->flag = 1;  // now SET it !
}

void unlock(lock_t *mutex) {
    mutex->flag = 0;
}

这个锁有问题吗?

缺乏原子性:对 mutex->flag 的检查和设置不是原子操作。在多线程环境中,两个线程可能同时读取 flag 的值并都认为锁是可用的,从而导致数据竞争和不一致的状态。

enter image description here

TestAndSet指令

  • 使用硬件保证指令的原子性
int TestAndSet(int *ptr, int new) {
    int old = *ptr; // fetch old value at ptr
    *ptr = new;     // store ‘new’ into ptr
    return old;     // return the old value
}
typedef struct __lock_t {
    int flag;
} spinlock_t;

void spin_init(spinlock_t *lock) {
    // 0 indicates that lock is available,
    // 1 that it is held
    lock->flag = 0;
}

void spin_lock(spinlock_t *lock) {
    while (TestAndSet(&lock->flag, 1) == 1)
        ; // spin-wait (自旋等待)
}

void spin_unlock(spinlock_t *lock) {
    lock->flag = 0;
}

LL/SC指令对(MIPS)

  • LL(Load-Linked): 链接的加载
  • SC(Store-Conditional):条件式存储
int LoadLinked(int *ptr) {
    return *ptr;
}

int StoreConditional(int *ptr, int value) {
    if (/* 自从上次LoadLinked这个地址以来*ptr没被更新过 */) {
        *ptr = value;
        return 1; // success!
    } else {
        return 0; // failed to update
    }
}

void lock(lock_t *lock) {
    while (1) {
        while (LoadLinked(&lock->flag) == 1)
            ; // spin until it’s zero
        if (StoreConditional(&lock->flag, 1) == 1)
            return; // if 成功设为1: all done
        // otherwise: 重试
    }
}

void unlock(lock_t *lock) {
    lock->flag = 0;
}

如何解决公平性问题?

FetchAndAdd指令

  • 原子地增加一个内存单元的值,返回旧的值
int FetchAndAdd(int *ptr) {
    int old = *ptr;
    *ptr = old + 1;
    return old;
}
  • Ticket lock: 可避免饿死
typedef struct __lock_t {
    int ticket;
    int turn;
} lock_t;

void lock_init(lock_t *lock) {
    lock->ticket = 0;
    lock->turn = 0;
}

void lock(lock_t *lock) {
    int myturn = FetchAndAdd(&lock->ticket);
    
    while (lock->turn != myturn)
        ; // spin
}

void unlock(lock_t *lock) {
    FetchAndAdd(&lock->turn);
}

自旋锁的总结

  • 优点
    • 减少上下文切换(线程不需要进入阻塞状态),实现简单
    • 适用场景:多处理器系统、短期临界区
  • 缺点
    • CPU资源浪费 如何摆脱“自旋”
    • 不适用场景:单处理器系统、高并发环境

三、非自旋锁

非自旋锁的实现:主动放弃CPU

typedef struct __lock_t { 
    int flag; 
} lock_t;

void init(lock_t *m) {
    m->flag = 0; 
}

void lock(lock_t *m) {
    while (TestAndSet(&m->flag, 1) == 1) 
        yield(); // give up the CPU
}

void unlock(lock_t *m) {
    m->flag = 0; 
}

依然低效: 频繁上下文切换

依然可能饿死

非自旋锁的实现:休眠替代自旋

typedef struct __lock_t { 
    int flag; 
    queue_t *q; 
} lock_t;

void init(lock_t *m) {
    m->flag = 0; 
    queue_init(m->q);
}

void lock(lock_t *m) {
    while (TestAndSet(&m->flag, 1) == 1) { 
        queue_add(m->q, gettid());
        park();
    }
}

void unlock(lock_t *m) {
    m->flag = 0; 
    unpark(queue_remove(m->q));
}

问题:可能出现竞态条件。

typedef struct __lock_t { 
    int flag; 
    queue_t *q;
    spinlock_t guard;
} lock_t;

void init(lock_t *m) {
    m->flag = 0;
    queue_init(m->q);
    spin_init(&m->guard);
}

void lock(lock_t *m) {
    spin_lock(&m->guard);
    while (TestAndSet(&m->flag, 1) == 1) {
        queue_add(m->q, gettid());
        park();
    }
    spin_unlock(&m->guard);
}

void unlock(lock_t *m) {
    spin_lock(&m->guard);
    m->flag = 0;
    unpark(queue_remove(m->q));
    spin_unlock(&m->guard);
}

问题:可能死锁

void init(lock_t *m) {
    m->flag = 0; 
    queue_init(m->q);
    spin_init(&m->guard);
}

void lock(lock_t *m) {
    spin_lock(&m->guard);
    while (TestAndSet(&m->flag, 1) == 1) {
        queue_add(m->q, gettid());
        spin_unlock(&m->guard);
        // wakeup/waiting race 
        // 如果此时unlock()
        park(); // This will block until unpark is called
        spin_lock(&m->guard);
    }
    spin_unlock(&m->guard);
}

typedef struct __lock_t { 
    int flag; 
    queue_t *q;
    spinlock_t guard;
} lock_t;

// wakeup/waiting race condition handling
void unlock(lock_t *m) {
    spin_lock(&m->guard);
    m->flag = 0; 
    unpark(queue_remove(m->q));
    spin_unlock(&m->guard);
}

问题:如果在park前spin_unlock后时间片耗尽,转而调用其他线程的unlock,原线程会陷入休眠中无法释放。

typedef struct __lock_t { 
    int flag; 
    queue_t *q;
    spinlock_t guard; // 保护flag和q
} lock_t;

void init(lock_t *m) {
    m->flag = 0; 
    queue_init(m->q);
    spin_init(&m->guard);
}

void lock(lock_t *m) {
    spin_lock(&m->guard);
    while (TestAndSet(&m->flag, 1) == 1) {
        queue_add(m->q, gettid());
        setpark();	// !
        spin_unlock(&m->guard);
        park(); // 如果之前执行了unpark, 则park直接返回
        spin_lock(&m->guard);
    }
    spin_unlock(&m->guard);
}

void unlock(lock_t *m) {
    spin_lock(&m->guard);
    m->flag = 0; 
    unpark(queue_remove(m->q));
    spin_unlock(&m->guard);
}

最终版:

typedef struct __lock_t { 
    int flag; 
    queue_t *q; 
    spinlock_t guard;   // 保护flag和q
} lock_t;

void lock_init(lock_t *m) {
    m->flag = 0;
    queue_init(m->q);
    spin_init(&m->guard); 
}

void lock(lock_t *m) {
    spin_lock(&m->guard);
    if (m->flag == 0) {
        m->flag = 1; // lock is acquired
        spin_unlock(&m->guard);
    } else {
        queue_add(m->q, gettid());
        setpark();
        spin_unlock(&m->guard);
        park(); // 被唤醒后直接拿到锁
    }
}

void unlock(lock_t *m) {
    spin_lock(&m->guard);
    
    if (queue_empty(m->q)) {
        m->flag = 0; // 解锁; no one wants it
    } else {
        unpark(queue_remove(m->q)); 
    }
    
    // hold lock (for next thread!)
    spin_unlock(&m->guard);
}

非自旋锁的实现:futex

  • Linux提供futex(类似于Solaris的park和unpark)
  • futex_wait(address, expected)
    • 使调用线程休眠
    • 如果address的值不等于expected,则立即返回
  • futex_wake(address)
    • 唤醒队列里等待的一个线程
typedef struct __lock_t { 
    int flag; 
} lock_t;
void init(lock_t *m) {
    m->flag = 0;
}

void lock(lock_t *m) {
    while (TestAndSet(&m->flag, 1) == 1) {
        futex_wait(&m->flag, 1);
    }
}

void unlock(lock_t *m) {
    m->flag = 0;
    futex_wake(&m->flag); // 修正为 m->flag
}
  • mutex_lock(int *mutex)
    • 利用一个整数记录锁的占用情况(最高位)和等待着个数(其他位)
void mutex_lock(int *mutex) {
    int v;
    /* Bit 31 was clear, 得到锁 */
    if (atomic_bit_test_set(mutex, 31) == 0)
        return;
    atomic_increment(mutex);
    while (1) {
	    if (atomic_bit_test_set(mutex, 31) == 0) {
	        atomic_decrement(mutex);
	        return;
	    }
	    /* We have to wait now. Make sure locked */
	    v = *mutex;
	    if (v >= 0) continue;
	    futex_wait(mutex, v);
	}
}

void mutex_unlock(int *mutex) {
    /* check if there are threads waiting for the mutex */
    if (atomic_add_zero(mutex, 0x80000000))
        return;
    /* There are other threads waiting for this mutex, wake one of them up */
    futex_wake(mutex);
}

非自旋锁的总结

  • 优点
    • 避免忙等待,不会占用CPU资源
    • 适用场景:长期临界区,高并发环境
  • 缺点
    • 上下文切换开销大
    • 不适用场景:短期临界区

两阶段锁

  • Linux采用的一种锁方案(古老但一直被采用)
    • 第一阶段采用自旋一段时间
    • 如果第一阶段没有获得锁,则会进入休眠状态

Hybrid是OS设计中的一种重要设计思想。

“锁”小结

  • 锁的基本思想
    • 评价:有效、公平、性能
    • 自旋锁
      • TestAndSet、LL/SC、FetchAndAdd
    • 非自旋锁
      • yield()、park和unpark、futex