将 Go 例程与通道和等待组同步
由 Mux 主办的 DEV 全球展示挑战赛:展示你的项目!
最近在调试一个莫名其妙卡死的 Go 函数时,我学到了如何使用 WaitGroups 和 Channels 来同步 Go 例程的新知识。继续阅读了解更多!
引言
作为一名相对较新的 Go 程序员,我最近被一个 bug 难住了,这个 bug 导致一个运行多个 Go 例程的函数无限期地挂起。在深入研究了 WaitGroups 和 Channels 的用法,并最终理解了如何正确使用这些工具来管理 Go 例程的行为之后,我成功解决了这个 bug。
在这篇文章中,我们将
- 检查有问题的代码,并分析其无法正常运行的原因。
- 通过一个简单的例子,逐步讲解同步 Go 例程的正确方法。
- 修复我们的漏洞!
阻塞性漏洞
在一个负责显示 GitHub 使用情况分析的应用程序中,我们有一个函数,MergeContributors它负责将属于同一人的两个 GitHub 用户帐户“合并”为一个帐户,以便 UI 能够准确反映他们对项目提交、拉取请求和合并的贡献。此“贡献者合并”操作包含以下内容:
- 更新分析引擎中存储的所有 Git 提交记录,以便将用户两个帐户中的所有提交记录正确关联到同一个主帐户。
- 更新分析引擎中所有已存储的拉取请求,以便将用户两个帐户中的所有拉取请求正确关联到同一个主帐户。
- 更新分析引擎中所有已存储的 PR 合并记录,以便将两个帐户中所有用户的合并记录正确关联到同一个主帐户。
- 最后,如果所有这些操作都成功了,则更新记录,表明这两个帐户已“合并”。
该函数出现以下故障:
- 如果在前三个步骤中的任何一个步骤中发生错误
- 然后该函数就会阻塞,永远挂起,永不返回。
糟糕!我们来看看原始代码,以便诊断问题所在:
func MergeCOntributors(primaryAccount, secondaryAccount) error {
// Create a WaitGroup to manage the goroutines.
var waitGroup sync.WaitGroup
c := make(chan error)
// Perform 3 concurrent transactions against the database.
waitGroup.Add(3)
go func() {
waitGroup.Wait()
close(c)
}()
// Transaction #1, merge "commit" records
go func() {
defer waitGroup.Done()
err := mergeCommits(primaryAccount, secondaryAccount)
if err != nil {
c <- err
}
}()
// Transaction #2, merge "pull request" records
go func() {
defer waitGroup.Done()
err := mergePullRequests(primaryAccount, secondaryAccount)
if err != nil {
c <- err
}
}()
// Transaction #3, merge "merge" records
go func() {
defer waitGroup.Done()
err := mergePullRequestMerges(primaryAccount, secondaryAccount)
if err != nil {
c <- err
}
}()
waitGroup.Wait()
for err := range c {
if err != nil {
return err
}
}
return markMerged(primaryAccount, secondaryAccount)
}
分解虫子
让我们一起来分析一下这里发生了什么:
- 我们建立了一个等待组,
waitGroup以及一个用于接收错误的通道。
var waitGroup sync.WaitGroup
c := make(chan error)
- 我们将 WaitGroup 计数增加到 3,因为我们将使用它来协调三个同步数据库事务 Go 例程。
waitGroup.Add(3)
- 我们启动一个独立的 Go 例程,该例程会阻塞,直到
waitGroup计数降回零。一旦解除阻塞,它就会关闭通道。
go func() {
waitGroup.Wait()
close(c)
}()
-
我们启动了三个并发的 Go 例程,每个例程对应一个数据库事务。每个 Go 例程运行一个函数,该函数负责发起数据库调用,必要时向通道发送错误,并
waitGroup在函数返回前递减计数。 -
然后,我们调用了
waitGroup.Wait()。这将阻塞主函数的执行,直到waitGroup的计数回到零。 -
在这个阻塞调用之后,我们将使用它
range来遍历发送到通道的消息。该调用range将持续监听通道中的消息,直到通道关闭。这是一个阻塞操作。 -
一旦通道被我们之前的 Go 例程关闭(该例程等待计数
waitGroup降至零,然后调用)close(c),对的调用range将停止监听通道,主函数将继续运行,如果通道收到错误则返回错误,否则继续执行最后一项工作,即对的调用markMerged。
你找到问题所在了吗?
理解问题
为了找出这个漏洞,我们需要了解一些关于通过通道发送和接收消息的工作原理。当我们通过通道发送消息时,发送消息的调用会阻塞,直到从通道中读取到该消息为止。
仔细查看我们的一个数据库事务 Go 例程:
// Transaction #1, merge "commit" records
go func() {
defer waitGroup.Done()
err := mergeCommits(primaryAccount, secondaryAccount)
if err != nil {
c <- err
}
}()
我们可以理解,向通道发送消息的那一行代码c <- error,会阻塞在 Go 例程中运行的匿名函数的执行,直到读取该消息为止。
我们在哪里阅读消息?通过我们的range通话,而这个通话是在拨打以下电话之后waitGroup.Wait()进行的:
...
waitGroup.Wait()
for err := range c {
if err != nil {
return err
}
}
return markMerged(primaryAccount, secondaryAccount)
问题在于:
- 匿名函数向通道发送一条消息,该通道会阻塞,直到消息被读取。
- 消息读取代码(即我们的调用)要等到计数降回零后才会运行,因为它是在
range同步调用之后执行的。waitGroupwaitGroup.Wait() - 由于消息还无法读取,因此在我们的 go 例程中运行的匿名函数无法完成运行——它不会调用 deferred
waitGroup.Done()。 - 这意味着
waitGroup计数永远不会回到零,反过来又意味着在我们调用waitGroup.Wait()之前对该计数的调用永远不会解除阻塞。range - 由于
waitGroup.Wait()永远不会解除阻塞,我们无法调用range,消息永远不会从通道中读取,我们又回到了原点——陷入了无限阻塞!
同步围棋程序的正确方法
为了防止这种阻塞,我们需要确保我们的调用能够在Go 例程运行时range运行并从通道读取消息。这将确保任何向通道发送消息的 Go 例程都不会阻塞于该发送操作,从而允许 Go 例程的匿名函数进行调用。waitGroup.Done()
我们来看一个简单的例子:
package main
import "fmt"
func main() {
// Create a WaitGroup to manage the goroutines.
var waitGroup sync.WaitGroup
c := make(chan string)
// Perform 3 concurrent transactions against the database.
waitGroup.Add(3)
go func() {
waitGroup.Wait()
close(c)
}()
go func() {
defer waitGroup.Done()
c <- "one"
}()
go func() {
defer waitGroup.Done()
c <- "two"
}()
go func() {
defer waitGroup.Done()
c <- "three"
}()
for str := range c {
fmt.Println(str)
}
}
让我们来详细分析一下。我们:
- 创建一个等待组,并将其数量设置为三。
- 建立一个可以接收字符串消息的通道
- 创建一个 Go 例程,该例程会等待直到
waitGroup计数为零,然后关闭通道。 - 创建三个独立的 Go 例程,每个例程都会向通道写入一条消息,并在读取该消息后将
waitGroup计数减一。 - 然后,在运行这些 Go 例程的同时,我们遍历通道,读取任何传入的消息并将其打印到 STDOUT。
- 由于我们的范围调用与 Go 例程同时运行,因此每个例程发送到通道的消息都会立即被读取。因此,每个例程中发送消息的调用不会长时间阻塞,并且每个例程的匿名函数都能调用该
waitGroup.Done()调用。 - 三个 Go 例程分别结束后,
waitGroup计数递减,第一个Go 例程(调用 的例程waitGroup.Wait())将解除阻塞并关闭通道。 - 通道关闭且所有消息读取完毕后,范围将停止监听消息,主功能将结束运行。
如果运行上面的代码,我们会发现该函数并没有错误地阻塞。相反,我们会看到以下输出成功打印出来:
one
two
three
既然我们已经了解了同步 Go 例程的正确方法,那就让我们来修复我们的错误吧!
修复漏洞
我们需要去掉第二个同步调用waitGroup.Wait()。这个调用会阻止从通道读取消息,进而阻止对的任何调用waitGroup.Done()。这就是我们函数阻塞的原因。
如果删除出错的那一行,剩下的就是:
func MergeCOntributors(primaryAccount, secondaryAccount) error {
// Create a WaitGroup to manage the goroutines.
var waitGroup sync.WaitGroup
c := make(chan error)
// Perform 3 concurrent transactions against the database.
waitGroup.Add(3)
go func() {
waitGroup.Wait()
close(c)
}()
// Transaction #1, merge "commit" records
go func() {
defer waitGroup.Done()
err := mergeCommits(primaryAccount, secondaryAccount)
if err != nil {
c <- err
}
}()
// Transaction #2, merge "pull request" records
go func() {
defer waitGroup.Done()
err := mergePullRequests(primaryAccount, secondaryAccount)
if err != nil {
c <- err
}
}()
// Transaction #3, merge "merge" records
go func() {
defer waitGroup.Done()
err := mergePullRequestMerges(primaryAccount, secondaryAccount)
if err != nil {
c <- err
}
}()
// This line is bad! Get rid of it!
// waitGroup.Wait()
for err := range c {
if err != nil {
return err
}
}
return markMerged(primaryAccount, secondaryAccount)
}
现在,我们要确保以下行为:
- 运行一个阻塞的 Go 例程,通过调用 `waitGroup` 函数
waitGroup.Wait(),直到 WaitGroup 计数降至零,此时关闭通道。 - 每个“DB事务”Go例程的匿名函数,如果出现错误,都会向通道发送一条消息。
- 该呼叫系统
range正在监听此类消息,并在消息到达时进行读取。 - 在每个“数据库事务”Go例程中,该函数会被解除阻塞,调用
waitGroup.Done()将会运行,从而使等待组的计数递减。 - 当 WaitGroup 的计数达到零时,第一个 Go 例程将通过调用来解除阻塞并关闭通道。
close(c) - 这将指示
range调用停止监听通道,从而停止阻塞,允许主函数继续执行。
结论
一个放错位置waitGroup.Wait()就能造成多大的破坏啊!关键在于,当你向一个通道发送消息时,它会阻塞,直到该消息被读取。为了成功同步我们的 Go 例程,我们需要确保从所有写入的通道中读取数据。
在我们遇到的这个 bug 中,我们错误地阻塞了函数的执行,导致通道中无法读取任何消息。通过移除多余的阻塞waitGroup.Wait(),我们确保了发送到通道的消息都能被读取,从而使通道waitGroup计数器递减。这反过来又确保了通道会被关闭,解除对该通道消息的阻塞,并允许主函数继续执行。
这个 bug 确实让我对 Go 例程同步有了更深入的了解,希望它对你也有帮助。
祝您编程愉快!
文章来源:https://dev.to/sophiedebenedetto/synchronizing-go-routines-with-channels-and-waitgroups-3ke2