[TIPS] 有关 Read-Write Lock
Contents
关于读写锁死锁的问题
最近在写框架绑定服务提供者的时候,遇到了编译出错,出现了死锁。主要代码如下:
func (hade *HadeContainer) Bind(provider ServiceProvider) error {
hade.lock.Lock()
defer hade.lock.Unlock()
key := provider.Name()
hade.providers[key] = provider
// if provider is not defer
if provider.IsDefer() == false {
if err := provider.Boot(hade); err != nil {
return err
}
// 实例化方法
// 省略...
}
return nil
}
这段代码调用了 provider.Boot() 方法
func (provider *HadeEnvProvider) Boot(c framework.Container) error {
app := c.MustMake(contract.AppKey).(contract.App)
// 省略...
}
func (hade *HadeContainer) MustMake(key string) interface{} {
serv, err := hade.make(key, nil, false)
// 省略...
}
func (hade *HadeContainer) make(key string, params []interface{}, forceNew bool) (interface{}, error) {
hade.lock.RLock()
defer hade.lock.RUnlock()
// 查询是否已经注册了这个服务提供者,如果没有注册,则返回错误
sp := hade.findServiceProvider(key)
// 省略...
}
在 make 方法里调用了读锁,由于前面的写锁是 defer 的,这时候并没有释放,这时调用读锁,造成了死锁。
读写锁的数据结构
源码中 src/sync/rwmutex.go:RWMutex 定义了读写锁的数据结构
type RWMutex struct {
w Mutex // 用于控制多个写锁,获得写锁首先要获取该锁
writerSem uint32 // 写阻塞等待的信号量,最后一个读者释放锁时会释放信号量
readerSem uint32 // 读阻塞的协程等待的信号量,持有写锁释放锁后会释放信号量
readerCount32 int32 // 记录读者个数
readerWait int32 // 记录写阻塞时的读者个数
}
读写锁内部仍有一个互斥锁,用于将多个写操作隔离开来,其他几个字段用于隔离读操作和写操作。
RWMutex 提供了 4 个简单的接口
- RLock()
- RUnlock()
- Lock()
- Unlock()
Lock() 的实现逻辑
- 获取互斥锁
- 阻塞等待所有读操作结束(如果有的话)
Unlock() 的实现逻辑
- 唤醒因读锁定而被阻塞的协程(如果有的话)
- 解除互斥锁
RLock() 的实现逻辑
- 增加读操作计数,即 readerCount++
- 阻塞等待写操作结束(如果有的话)
RUnlock() 的实现逻辑
- 减少读操作计数,即 readerCount–
- 唤醒等待写操作的协程(如果有的话)
在读写锁中,写操作会阻塞写操作和读操作,读操作会阻塞写操作,但是读操作不会阻塞读操作。如果中间没有及时释放锁,就会造成死锁。
读写锁是如何相互阻塞的
- 写操作是依赖互斥锁来阻止其他写操作的
- 写操作将 readerCount 变成负值来阻止读操作 这是读写锁的精华
- 读操作增加 readerCount 的值来阻止写操作
- 写操作不会被“饿死”。写操作到来时,会把 readCount的值复制到 readerWait 中,用于标记在写操作前面的读者个数。读操作结束后,会递减 readerCount 的值,还会递减 readerWait 的值,readerWait 变为 0 时,唤醒写操作。
所以基于读写锁的特性,读写锁特别适合应用在具有一定并发量且读多写少的场合。 在大量并发读的情况下,多个 Goroutine 可以同时持有读锁,从而减少在锁竞争中等待的时间。
对上面代码的修改
知道了死锁的原因后,我们只要手动释放写锁即可,也侧面证明了 defer 很好,但是也不能乱用。代码如下:
func (hade *HadeContainer) Bind(provider ServiceProvider) error {
hade.lock.Lock()
key := provider.Name()
hade.providers[key] = provider
hade.lock.Unlock()
// if provider is not defer
if provider.IsDefer() == false {
if err := provider.Boot(hade); err != nil {
return err
}
// 实例化方法
// 省略...
}
return nil
}