简单的例子:
package main
import (
"fmt"
"sync"
)
func main() {
// 用于模拟从ETCD获取到的命名空间列表
names := []string{"ns1", "ns2", "ns3"}
// 用于分发命名空间名称
nameC := make(chan string)
// 用于接收处理后的命名空间
namespaceC := make(chan string)
var wg sync.WaitGroup
// 创建10个工作Goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for name := range nameC {
// 模拟从ETCD加载命名空间的过程
namespace := "loaded-" + name
namespaceC <- namespace
}
}()
}
// 另一个Goroutine负责分发所有命名空间名称到nameC通道,并等待所有工作Goroutine完成
go func() {
for _, name := range names {
nameC <- name
}
close(nameC)
wg.Wait()
close(namespaceC)
}()
// 收集从namespaceC通道接收到的所有命名空间模型,并存储在namespaceModels映射中
namespaceModels := make(map[string]string)
for namespace := range namespaceC {
namespaceModels[namespace] = "some value"
}
// 输出结果
fmt.Println("Collected namespaces:", namespaceModels)
}
package main
import (
"fmt"
"sync"
)
func main() {
// 用于模拟从ETCD获取到的命名空间列表
names := []string{"ns1", "ns2", "ns3"}
// 用于分发命名空间名称
nameC := make(chan string)
// 用于接收处理后的命名空间
namespaceC := make(chan string)
var wg sync.WaitGroup
// 创建10个工作Goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for name := range nameC {
// 模拟从ETCD加载命名空间的过程
namespace := "loaded-" + name
namespaceC <- namespace
}
}()
}
// 另一个Goroutine负责分发所有命名空间名称到nameC通道,并等待所有工作Goroutine完成
go func() {
for _, name := range names {
nameC <- name
}
close(nameC)
}()
go func() {
wg.Wait()
close(namespaceC)
}()
// 收集从namespaceC通道接收到的所有命名空间模型,并存储在namespaceModels映射中
namespaceModels := make(map[string]string)
for namespace := range namespaceC {
namespaceModels[namespace] = "some value"
}
// 输出结果
fmt.Println("Collected namespaces:", namespaceModels)
}
我们使用 sync.WaitGroup 是为了等待所有工作Goroutine完成它们的任务。 一旦 wg.Wait() 返回,我们就可以确信所有的工作Goroutine都完成了它们的工作,并且 namespaceC 通道中已经没有更多的消息要发送了。 于是,这时候关闭 namespaceC 是安全的。 将这段代码放在主Goroutine中也是完全可行的,但通常把这种逻辑放在一个单独的 Goroutine 是为了让主Goroutine可以继续执行其他任务(例如在本例中从 namespaceC 中读取数据)。
如果你把 wg.Wait() 和 close(namespaceC) 放在主Goroutine,并且在那之前没有从 namespaceC 读取数据,那么可能会造成死锁,因为工作Goroutine可能在尝试往 namespaceC 写数据,而没有其他Goroutine从该通道读取。所以,为了避免这种情况,我们通常会在一个单独的 Goroutine 中进行这一系列操作。
错误写法
package main
import (
"fmt"
"sync"
)
func main() {
// 用于模拟从ETCD获取到的命名空间列表
names := []string{"ns1", "ns2", "ns3"}
// 用于分发命名空间名称
nameC := make(chan string)
// 用于接收处理后的命名空间
namespaceC := make(chan string)
var wg sync.WaitGroup
// 创建10个工作Goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for name := range nameC {
// 模拟从ETCD加载命名空间的过程
namespace := "loaded-" + name
namespaceC <- namespace
}
}()
}
// 另一个Goroutine负责分发所有命名空间名称到nameC通道,并等待所有工作Goroutine完成
go func() {
for _, name := range names {
nameC <- name
}
close(nameC)
}()
wg.Wait()
close(namespaceC)
// 收集从namespaceC通道接收到的所有命名空间模型,并存储在namespaceModels映射中
namespaceModels := make(map[string]string)
for namespace := range namespaceC {
namespaceModels[namespace] = "some value"
}
// 输出结果
fmt.Println("Collected namespaces:", namespaceModels)
}
明确设计目标:
分发Goroutine:其职责是将所有需要处理的”namespace”名称分发到一个通道中。 工作Goroutine(10个):它们从通道接收”namespace”名称,进行一些模拟处理,然后将处理后的结果发送到另一个通道。
// 用于模拟从ETCD获取到的命名空间列表
names := []string{"ns1", "ns2", "ns3"}
数据流
“namespace”名称首先出现在一个名为names的切片。 分发Goroutine将这些名称发送到nameC通道。 工作Goroutine从nameC通道接收名称,进行处理,并将结果发送到namespaceC通道。 主Goroutine从namespaceC通道收集处理后的结果。
nameC := make(chan string)
namespaceC := make(chan string)
同步和互斥
使用sync.WaitGroup来等待所有工作Goroutine完成。 分发Goroutine在发送完所有名称后关闭nameC。 主Goroutine等待所有工作Goroutine完成后,再关闭namespaceC。
资源的创建和销毁
nameC和namespaceC是创建的资源,分发Goroutine和主Goroutine分别负责关闭它们。
避免死锁和活锁
工作Goroutine需要能从nameC读取数据,所以nameC必须在所有工作完成后关闭。 主Goroutine需要能从namespaceC读取数据,所以namespaceC必须在所有工作完成后关闭。