02 | Mutex:庖丁解牛看实现
Mutex架构演进分为四个阶段:
- 初版:flag标记锁是否被持有
- 给新人机会:新的goroutine有机会参与竞争锁
- 多给些机会:新来的和被唤醒的goroutine,但带来饥饿问题
- 解决饥饿:解决竞争问题,不让goroutine长久等待
初版互斥锁
通过flag变量,标记当前的锁是否被某个goroutine持有,1为已经被持有,其他goroutine只能等待,flag为0,可以通过CAS将flag置为1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func cas(val *int32, old, new int32) bool func semacquire(*int32) func semrelease(*int32)
type Mutex struct { key int32 sema int32
func xadd(val *int32, delta int32) (new int32) { for { v := *val if cas(val, v, v+delta) { return v + delta } } panic("unreached") }
func (m *Mutex) Lock() { if xadd(&m.key, 1) == 1 { return } semacquire(&m.sema) } func (m *Mutex) Unlock() { if xadd(&m.key, -1) == 0 { return } semrelease(&m.sema) }
|
CAS指令
原子性的
将给定制和一个内存地址中的值进行比较,如果是同一值,就用新值替换内存地址中的值,这个操作是原子性的
原子性保证了指令总是基于最新的值进行计算,如果同时又其他线程修改了这个值,CAS返回失败
Mutex结构体包含了两个字段
- 字段key:flag,表示排外锁是否被goroutine持有
- 字段sema:信号变量,控制等待goroutine的阻塞休眠和唤醒
Unlock方法可以被任意的goroutine调用释放锁,即使是没持有这个互斥锁的goroutine,也可以进行这个操作。因为,Mutex本身并没有包含着这把锁的goroutine的信息,所以,Unlock也不会对此进行检查。
这是一个有趣而危险的功能!
goroutine可以强制释放锁,这是一个危险的操作,因为临界区的goroutine可能不知道锁已经被释放,而继续进行临界区的业务操作,goroutine以为自己持有锁,会造成data race问题。
在使用Mutex时,要保证goroutine不释放自己未持有的锁,遵循“谁申请,谁释放”的原则。
处于性能考虑,即使释放掉锁,在if-else分之加入释放锁,代码会很臃肿,重构时也会误删或漏掉而出现死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| type Foo struct { mu sync.Mutex count int } func (f *Foo) Bar() { f.mu.Lock() if f.count < 1000 { f.count += 3 f.mu.Unlock() return } f.count++ f.mu.Unlock() return }
|
1.14 版本,对defer做了优化
1 2 3 4 5 6 7 8 9 10 11
| func (f *Foo) Bar() { f.mu.Lock() defer f.mu.Unlock() if f.count < 1000 { f.count += 3 return } f.count++ return }
|
初版的Mutex有一个问题:请求锁的goroutine会排队等待获取互斥锁。
给新人机会
2011.6.30 Mutex大调整
1 2 3 4 5 6 7 8 9
| type Mutex struct { state int32 sema uint32 } const ( mutexLocked = 1 << iota mutexWoken mutexWaiterShift = iota )
|
Mutex的结构体第一个字段改为了state
state是符合型的字段,包含多重含义
- 第一位表示锁是否被持有
- 第二位表示是否有唤醒的goroutine
- 剩余位数代表等待此锁的goroutine数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } awoke := false for { old := m.state new := old | mutexLocked if old&mutexLocked != 0 { new = old + 1<<mutexWaiterShift } if awoke { new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&mutexLocked == 0 { break } runtime.Semacquire(&m.sema) awoke = true } } }
|