Go 语言完全指南 / 15 - 同步原语:Mutex、RWMutex、WaitGroup、Once、Pool
15 - 同步原语
15.1 sync.Mutex(互斥锁)
package main
import (
"fmt"
"sync"
)
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
func main() {
counter := &SafeCounter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("计数:", counter.Value()) // 1000
}
⚠️ 注意:
sync.Mutex不可复制(复制后锁状态不一致)- 永远用
defer mu.Unlock()确保解锁 - 不要在持锁时调用可能阻塞的函数
15.2 sync.RWMutex(读写锁)
读写锁允许多个读操作并发,但写操作互斥。
package main
import (
"fmt"
"sync"
"time"
)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func NewCache() *Cache {
return &Cache{data: make(map[string]string)}
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock() // 读锁
defer c.mu.RUnlock()
val, ok := c.data[key]
return val, ok
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁
defer c.mu.Unlock()
c.data[key] = value
}
func (c *Cache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.data, key)
}
func (c *Cache) Len() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.data)
}
func main() {
cache := NewCache()
var wg sync.WaitGroup
// 并发写入
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
cache.Set(fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i))
}(i)
}
// 并发读取
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
if val, ok := cache.Get(fmt.Sprintf("key-%d", i)); ok {
_ = val
}
}(i)
}
wg.Wait()
fmt.Println("缓存大小:", cache.Len())
}
| 锁类型 | 读操作 | 写操作 | 适用场景 |
|---|---|---|---|
Mutex | 互斥 | 互斥 | 通用 |
RWMutex | 并发 | 互斥 | 读多写少 |
15.3 sync.WaitGroup
import (
"fmt"
"sync"
"time"
)
func processTask(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("任务 %d 开始\n", id)
time.Sleep(time.Duration(id*100) * time.Millisecond)
fmt.Printf("任务 %d 完成\n", id)
}
func main() {
var wg sync.WaitGroup
tasks := 5
wg.Add(tasks) // 一次添加多个
for i := 1; i <= tasks; i++ {
go processTask(i, &wg)
}
wg.Wait()
fmt.Println("所有任务完成")
}
💡 技巧:wg.Add() 应在启动 goroutine 之前调用,避免竞态条件。
15.4 sync.Once
确保函数只执行一次(如单例、初始化)。
package main
import (
"fmt"
"sync"
)
type Database struct {
Host string
Port int
}
var (
dbInstance *Database
dbOnce sync.Once
)
func GetDatabase() *Database {
dbOnce.Do(func() {
fmt.Println("初始化数据库连接(只执行一次)")
dbInstance = &Database{
Host: "localhost",
Port: 5432,
}
})
return dbInstance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
db := GetDatabase()
_ = db
}()
}
wg.Wait()
// 输出: "初始化数据库连接(只执行一次)" 只出现一次
}
15.5 sync.Pool
对象池,减少频繁的内存分配和 GC 压力。
package main
import (
"bytes"
"fmt"
"sync"
)
// 创建 buffer 池
var bufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
func processRequest(data string) string {
// 从池中获取
buf := bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset() // 清空
bufferPool.Put(buf) // 归还
}()
buf.WriteString("处理: ")
buf.WriteString(data)
return buf.String()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
result := processRequest(fmt.Sprintf("请求-%d", i))
_ = result
}(i)
}
wg.Wait()
fmt.Println("完成")
}
| 特性 | 说明 |
|---|---|
| 线程安全 | ✅ 可在多 goroutine 中使用 |
| 自动清理 | GC 时可能清除池中对象 |
| 适用场景 | 频繁创建和销毁的临时对象 |
| 不适用 | 需要持久化的对象 |
15.6 sync.Map
// 第九章已详细介绍,此处简要对比
var m sync.Map
// 存储
m.Store("key", "value")
// 读取
v, ok := m.Load("key")
// 原子操作
v, loaded := m.LoadOrStore("key", "default")
15.7 sync.Cond
条件变量,用于 goroutine 间的等待/通知。
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
items []int
mu sync.Mutex
cond *sync.Cond
}
func NewQueue() *Queue {
q := &Queue{}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Put(item int) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, item)
q.cond.Signal() // 通知一个等待者
}
func (q *Queue) Get() int {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // 等待通知
}
item := q.items[0]
q.items = q.items[1:]
return item
}
func main() {
q := NewQueue()
// 消费者
go func() {
for i := 0; i < 5; i++ {
item := q.Get()
fmt.Printf("消费: %d\n", item)
}
}()
// 生产者
for i := 1; i <= 5; i++ {
time.Sleep(200 * time.Millisecond)
fmt.Printf("生产: %d\n", i)
q.Put(i)
}
time.Sleep(time.Second)
}
15.8 sync/atomic
原子操作,比锁更高效的并发安全操作。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
// 原子计数器
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Println("计数:", atomic.LoadInt64(&counter)) // 1000
// 原子比较并交换(CAS)
var value int64 = 100
swapped := atomic.CompareAndSwapInt64(&value, 100, 200)
fmt.Println(swapped, value) // true, 200
// 原子值
var config atomic.Value
config.Store(map[string]string{"env": "dev"})
go func() {
time.Sleep(time.Second)
config.Store(map[string]string{"env": "prod"})
}()
v := config.Load().(map[string]string)
fmt.Println(v["env"])
}
| 函数 | 说明 |
|---|---|
AddInt64 | 原子加 |
LoadInt64 | 原子读 |
StoreInt64 | 原子写 |
CompareAndSwapInt64 | CAS 操作 |
SwapInt64 | 原子交换 |
atomic.Value | 原子存储任意类型 |
🏢 业务场景
- 计数器/统计:
atomic实现高性能请求计数 - 连接池:
sync.Pool复用 HTTP 连接/Buffer - 缓存:
RWMutex保护读多写少的缓存 - 配置热更新:
atomic.Value无锁读取配置 - 初始化:
sync.Once确保单例只初始化一次