为什么需要两层通道
第一层通道(syncChan):用于传递同步请求
类型:chan chan struct{} 作用:将”需要同步”这个事件通知给后台 goroutine
第二层通道(done):用于确认完成
类型:chan struct{} 作用:作为回调通知机制,让调用方知道操作何时完成
关键优势
完全异步:调用方不会被阻塞在 channel 发送操作上 精确同步:保证收到响应时数据确实已刷新 资源安全:使用关闭通道作为信号,没有内存泄漏风险
与其他方案的对比
// 错误方案示例
syncChan := make(chan struct{})
// 调用方 Sync方法
func (l *LogAsyncWriter) Sync() {
l.syncChan <- struct{}{}
// 如何知道何时完成?
}
// 接收方 processLoop
case <-l.syncChan:
l.buf.Flush()
// 无法通知调用方已完成
这时会出现: 调用方不知道操作何时完成 多个 Sync 调用会相互干扰 无法处理并发同步请求
// 调用方 Sync方法
func (l *LogAsyncWriter) Sync() {
done := make(chan struct{}) // 创建一个信号通道
l.syncChan <- done // 将这个信号通道发送到同步通道
<-done // 等待信号通道被关闭
}
// 接收方 processLoop
case <-l.syncChan:
l.buf.Flush()
// 无法通知调用方已完成
close(done)
类比现实场景
想象快递站的工作模式:
你(调用方)有一个紧急包裹必须立即发货(Sync) 你把包裹交给站长(syncChan)时说:”等这个包裹发走时请打这个电话(done)通知我” 站长将你的电话号码(done)放入紧急处理队列(syncChan)
快递员(processLoop)看到紧急请求时: 立即处理所有包裹(Flush) 拨打你的电话(close(done)) 你接到电话就知道包裹已发出
其他应用场景
这种模式在分布式系统中也很常见,比如: RPC 调用中的请求/响应模型 任务队列中的回调通知 异步操作的超时控制
RPC 调用模型(请求/响应)
package main
import (
"fmt"
"time"
)
type Request struct {
Data int
RespChan chan int // 响应通道
}
func main() {
// 创建RPC服务端
reqChan := make(chan Request)
go server(reqChan)
// 客户端调用
resp := client(reqChan, 42)
fmt.Println("Received response:", resp)
}
func client(ch chan<- Request, data int) int {
respChan := make(chan int)
req := Request{Data: data, RespChan: respChan}
ch <- req // 发送请求
// 等待响应
return <-respChan
}
func server(ch <-chan Request) {
for req := range ch {
go func(r Request) {
// 模拟处理耗时
time.Sleep(500 * time.Millisecond)
// 通过请求自带的通道返回结果
r.RespChan <- r.Data * 2
}(req)
}
}
任务队列回调
package main
import (
"fmt"
"math/rand"
"time"
)
type Task struct {
ID int
Result chan int // 结果通知通道
}
func main() {
// 创建任务队列
taskQueue := make(chan Task, 10)
// 启动3个工作者
for i := 1; i <= 3; i++ {
go worker(i, taskQueue)
}
// 提交5个任务
for i := 1; i <= 5; i++ {
resultChan := make(chan int)
taskQueue <- Task{ID: i, Result: resultChan}
// 异步接收结果
go func(id int, ch <-chan int) {
fmt.Printf("Task %d result: %d\n", id, <-ch)
}(i, resultChan)
}
time.Sleep(2 * time.Second)
}
func worker(id int, tasks <-chan Task) {
for task := range tasks {
// 模拟处理耗时
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
// 通过任务自带的通道返回结果
task.Result <- task.ID * 10
}
}
带超时的异步操作
package main
import (
"context"
"fmt"
"time"
)
type AsyncOp struct {
Data interface{}
Done chan struct{} // 完成通知通道
}
func main() {
opChan := make(chan AsyncOp)
// 启动异步处理器
go processAsync(opChan)
// 发起操作并设置超时
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
defer cancel()
done := make(chan struct{})
opChan <- AsyncOp{Data: "重要操作", Done: done}
select {
case <-done:
fmt.Println("操作成功完成")
case <-ctx.Done():
fmt.Println("操作超时")
}
}
func processAsync(ch <-chan AsyncOp) {
op := <-ch
// 模拟耗时操作
time.Sleep(1 * time.Second)
close(op.Done) // 通知操作完成
}
关键设计模式总结:
通道嵌套的本质:通过传递通道来实现跨 goroutine 的通信协议
主要应用场景:
需要双向通信的异步操作
需要精确控制响应归属的并发系统
需要实现超时/取消机制的长时操作
优势分析:
解耦生产者与消费者 天然支持响应式编程 避免共享内存带来的并发问题 更细粒度的流程控制
性能优化技巧:
对响应通道进行对象池复用 设置合理的通道缓冲大小 及时关闭不再使用的通道 使用 select 实现优先级控制
实践联系
从简单示例开始,逐步添加以下功能进行练习:
给 RPC 示例添加重试机制 为任务队列实现进度通知(百分比) 在超时示例中添加取消功能 实现带优先级的请求处理队列