What chan chan struct{}

为什么需要两层通道

第一层通道(syncChan):用于传递同步请求

类型:chan chan struct{} 作用:将”需要同步”这个事件通知给后台 goroutine

第二层通道(done):用于确认完成

类型:chan struct{} 作用:作为回调通知机制,让调用方知道操作何时完成

关键优势

完全异步:调用方不会被阻塞在 channel 发送操作上 精确同步:保证收到响应时数据确实已刷新 资源安全:使用关闭通道作为信号,没有内存泄漏风险

与其他方案的对比

// 错误方案示例
syncChan := make(chan struct{})

// 调用方 Sync方法
func (l *LogAsyncWriter) Sync() {
    l.syncChan <- struct{}{}
    // 如何知道何时完成?
}

// 接收方 processLoop
case <-l.syncChan:
    l.buf.Flush()
    // 无法通知调用方已完成

这时会出现: 调用方不知道操作何时完成 多个 Sync 调用会相互干扰 无法处理并发同步请求


// 调用方 Sync方法
func (l *LogAsyncWriter) Sync() {
   done := make(chan struct{})  // 创建一个信号通道
   l.syncChan <- done           // 将这个信号通道发送到同步通道
    <-done                      // 等待信号通道被关闭
}

// 接收方 processLoop
case <-l.syncChan:
    l.buf.Flush()
    // 无法通知调用方已完成
    close(done) 

类比现实场景

想象快递站的工作模式:

你(调用方)有一个紧急包裹必须立即发货(Sync) 你把包裹交给站长(syncChan)时说:”等这个包裹发走时请打这个电话(done)通知我” 站长将你的电话号码(done)放入紧急处理队列(syncChan)

快递员(processLoop)看到紧急请求时: 立即处理所有包裹(Flush) 拨打你的电话(close(done)) 你接到电话就知道包裹已发出

其他应用场景

这种模式在分布式系统中也很常见,比如: RPC 调用中的请求/响应模型 任务队列中的回调通知 异步操作的超时控制

RPC 调用模型(请求/响应)

package main

import (
	"fmt"
	"time"
)

type Request struct {
	Data     int
	RespChan chan int // 响应通道
}

func main() {
	// 创建RPC服务端
	reqChan := make(chan Request)
	go server(reqChan)

	// 客户端调用
	resp := client(reqChan, 42)
	fmt.Println("Received response:", resp)
}

func client(ch chan<- Request, data int) int {
	respChan := make(chan int)
	req := Request{Data: data, RespChan: respChan}
	ch <- req // 发送请求
	// 等待响应
	return <-respChan
}

func server(ch <-chan Request) {
	for req := range ch {
		go func(r Request) {
			// 模拟处理耗时
			time.Sleep(500 * time.Millisecond)
			// 通过请求自带的通道返回结果
			r.RespChan <- r.Data * 2
		}(req)
	}
}

任务队列回调

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Task struct {
	ID     int
	Result chan int // 结果通知通道
}

func main() {
	// 创建任务队列
	taskQueue := make(chan Task, 10)

	// 启动3个工作者
	for i := 1; i <= 3; i++ {
		go worker(i, taskQueue)
	}

	// 提交5个任务
	for i := 1; i <= 5; i++ {
		resultChan := make(chan int)
		taskQueue <- Task{ID: i, Result: resultChan}

		// 异步接收结果
		go func(id int, ch <-chan int) {
			fmt.Printf("Task %d result: %d\n", id, <-ch)
		}(i, resultChan)
	}

	time.Sleep(2 * time.Second)
}

func worker(id int, tasks <-chan Task) {
	for task := range tasks {
		// 模拟处理耗时
		time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
		
		// 通过任务自带的通道返回结果
		task.Result <- task.ID * 10
	}
}

带超时的异步操作

package main

import (
	"context"
	"fmt"
	"time"
)

type AsyncOp struct {
	Data   interface{}
	Done   chan struct{} // 完成通知通道
}

func main() {
	opChan := make(chan AsyncOp)

	// 启动异步处理器
	go processAsync(opChan)

	// 发起操作并设置超时
	ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
	defer cancel()

	done := make(chan struct{})
	opChan <- AsyncOp{Data: "重要操作", Done: done}

	select {
	case <-done:
		fmt.Println("操作成功完成")
	case <-ctx.Done():
		fmt.Println("操作超时")
	}
}

func processAsync(ch <-chan AsyncOp) {
	op := <-ch
	// 模拟耗时操作
	time.Sleep(1 * time.Second)
	close(op.Done) // 通知操作完成
}

关键设计模式总结:

通道嵌套的本质:通过传递通道来实现跨 goroutine 的通信协议

主要应用场景:

需要双向通信的异步操作

需要精确控制响应归属的并发系统

需要实现超时/取消机制的长时操作

优势分析:

解耦生产者与消费者 天然支持响应式编程 避免共享内存带来的并发问题 更细粒度的流程控制

性能优化技巧:

对响应通道进行对象池复用 设置合理的通道缓冲大小 及时关闭不再使用的通道 使用 select 实现优先级控制

实践联系

从简单示例开始,逐步添加以下功能进行练习:

给 RPC 示例添加重试机制 为任务队列实现进度通知(百分比) 在超时示例中添加取消功能 实现带优先级的请求处理队列

Share: X (Twitter) Facebook LinkedIn