发布于 2026-01-06 6 阅读
0

将 Go 例程与通道和等待组同步 DEV 的全球展示挑战赛,由 Mux 呈现:展示你的项目!

将 Go 例程与通道和等待组同步

由 Mux 主办的 DEV 全球展示挑战赛:展示你的项目!

最近在调试一个莫名其妙卡死的 Go 函数时,我学到了如何使用 WaitGroups 和 Channels 来同步 Go 例程的新知识。继续阅读了解更多!

引言

作为一名相对较新的 Go 程序员,我最近被一个 bug 难住了,这个 bug 导致一个运行多个 Go 例程的函数无限期地挂起。在深入研究了 WaitGroups 和 Channels 的用法,并最终理解了如何正确使用这些工具来管理 Go 例程的行为之后,我成功解决了这个 bug。

在这篇文章中,我们将

  • 检查有问题的代码,并分析其无法正常运行的原因。
  • 通过一个简单的例子,逐步讲解同步 Go 例程的正确方法。
  • 修复我们的漏洞!

阻塞性漏洞

在一个负责显示 GitHub 使用情况分析的应用程序中,我们有一个函数,MergeContributors它负责将属于同一人的两个 GitHub 用户帐户“合并”为一个帐户,以便 UI 能够准确反映他们对项目提交、拉取请求和合并的贡献。此“贡献者合并”操作包含以下内容:

  • 更新分析引擎中存储的所有 Git 提交记录,以便将用户两个帐户中的所有提交记录正确关联到同一个主帐户。
  • 更新分析引擎中所有已存储的拉取请求,以便将用户两个帐户中的所有拉取请求正确关联到同一个主帐户。
  • 更新分析引擎中所有已存储的 PR 合并记录,以便将两个帐户中所有用户的合并记录正确关联到同一个主帐户。
  • 最后,如果所有这些操作都成功了,则更新记录,表明这两个帐户已“合并”。

该函数出现以下故障:

  • 如果在前三个步骤中的任何一个步骤中发生错误
  • 然后该函数就会阻塞,永远挂起,永不返回。

糟糕!我们来看看原始代码,以便诊断问题所在:

func MergeCOntributors(primaryAccount, secondaryAccount) error {
  // Create a WaitGroup to manage the goroutines.
    var waitGroup sync.WaitGroup
    c := make(chan error)

    // Perform 3 concurrent transactions against the database.
    waitGroup.Add(3)

    go func() {
        waitGroup.Wait()
        close(c)
    }()

    // Transaction #1, merge "commit" records
    go func() {
        defer waitGroup.Done()

        err := mergeCommits(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  // Transaction #2, merge "pull request" records
    go func() {
        defer waitGroup.Done()

        err := mergePullRequests(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  // Transaction #3, merge "merge" records
    go func() {
        defer waitGroup.Done()

        err := mergePullRequestMerges(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  waitGroup.Wait()

  for err := range c {
        if err != nil {
            return err
        }
  }

  return markMerged(primaryAccount, secondaryAccount)
}
Enter fullscreen mode Exit fullscreen mode

分解虫子

让我们一起来分析一下这里发生了什么:

  • 我们建立了一个等待组,waitGroup以及一个用于接收错误的通道。
var waitGroup sync.WaitGroup
c := make(chan error)
Enter fullscreen mode Exit fullscreen mode
  • 我们将 WaitGroup 计数增加到 3,因为我们将使用它来协调三个同步数据库事务 Go 例程。
waitGroup.Add(3)
Enter fullscreen mode Exit fullscreen mode
  • 我们启动一个独立的 Go 例程,该例程会阻塞,直到waitGroup计数降回零。一旦解除阻塞,它就会关闭通道。
go func() {
  waitGroup.Wait()
  close(c)
}()
Enter fullscreen mode Exit fullscreen mode
  • 我们启动了三个并发的 Go 例程,每个例程对应一个数据库事务。每个 Go 例程运行一个函数,该函数负责发起数据库调用,必要时向通道发送错误,并waitGroup在函数返回前递减计数。

  • 然后,我们调用了waitGroup.Wait()。这将阻塞主函数的执行,直到waitGroup的计数回到零。

  • 在这个阻塞调用之后,我们将使用它range来遍历发送到通道的消息。该调用range将持续监听通道中的消息,直到通道关闭。这是一个阻塞操作。

  • 一旦通道被我们之前的 Go 例程关闭(该例程等待计数waitGroup降至零,然后调用)close(c),对的调用range将停止监听通道,主函数将继续运行,如果通道收到错误则返回错误,否则继续执行最后一项工作,即对的调用markMerged

你找到问题所在了吗?

理解问题

为了找出这个漏洞,我们需要了解一些关于通过通道发送和接收消息的工作原理。当我们通过通道发送消息时,发送消息的调用会阻塞,直到从通道中读取到该消息为止

仔细查看我们的一个数据库事务 Go 例程:

// Transaction #1, merge "commit" records
go func() {
  defer waitGroup.Done()

  err := mergeCommits(primaryAccount, secondaryAccount)
  if err != nil {
    c <- err
  }
}()
Enter fullscreen mode Exit fullscreen mode

我们可以理解,向通道发送消息的那一行代码c <- error,会阻塞在 Go 例程中运行的匿名函数的执行,直到读取该消息为止

我们在哪里阅读消息?通过我们的range通话,而这个通话是在拨打以下电话之后waitGroup.Wait()进行的:

...
waitGroup.Wait()

for err := range c {
  if err != nil {
    return err
  }
}

return markMerged(primaryAccount, secondaryAccount)
Enter fullscreen mode Exit fullscreen mode

问题在于:

  • 匿名函数向通道发送一条消息,该通道会阻塞,直到消息被读取。
  • 消息读取代码(即我们的调用)要等到计数降回零后才会运行,因为它是range同步调用之后执行的。waitGroupwaitGroup.Wait()
  • 由于消息还无法读取,因此在我们的 go 例程中运行的匿名函数无法完成运行——它不会调用 deferred waitGroup.Done()
  • 这意味着waitGroup计数永远不会回到零,反过来又意味着在我们调用waitGroup.Wait()之前对该计数的调用永远不会解除阻塞range
  • 由于waitGroup.Wait()永远不会解除阻塞,我们无法调用range,消息永远不会从通道中读取,我们又回到了原点——陷入了无限阻塞!

同步围棋程序的正确方法

为了防止这种阻塞,我们需要确保我们的调用能够在Go 例程运行range运行并从通道读取消息。这将确保任何向通道发送消息的 Go 例程都不会阻塞于该发送操作,从而允许 Go 例程的匿名函数进行调用waitGroup.Done()

我们来看一个简单的例子:

package main

import "fmt"

func main() {
  // Create a WaitGroup to manage the goroutines.
    var waitGroup sync.WaitGroup
    c := make(chan string)

    // Perform 3 concurrent transactions against the database.
    waitGroup.Add(3)

    go func() {
        waitGroup.Wait()
        close(c)
    }()

    go func() {
        defer waitGroup.Done()

        c <- "one"
  }()

  go func() {
        defer waitGroup.Done()

        c <- "two"
  }()

  go func() {
        defer waitGroup.Done()

        c <- "three"
  }()

  for str := range c {
        fmt.Println(str)
  }
}
Enter fullscreen mode Exit fullscreen mode

让我们来详细分析一下。我们:

  • 创建一个等待组,并将其数量设置为三。
  • 建立一个可以接收字符串消息的通道
  • 创建一个 Go 例程,该例程会等待直到waitGroup计数为零,然后关闭通道。
  • 创建三个独立的 Go 例程,每个例程都会向通道写入一条消息,并在读取该消息后将waitGroup计数减一。
  • 然后,在运行这些 Go 例程的同时,我们遍历通道,读取任何传入的消息并将其打印到 STDOUT。
  • 由于我们的范围调用与 Go 例程同时运行,因此每个例程发送到通道的消息都会立即被读取。因此,每个例程中发送消息的调用不会长时间阻塞,并且每个例程的匿名函数都能调用该waitGroup.Done()调用。
  • 三个 Go 例程分别结束后,waitGroup计数递减,第一个Go 例程(调用 的例程waitGroup.Wait())将解除阻塞并关闭通道。
  • 通道关闭且所有消息读取完毕后,范围将停止监听消息,主功能将结束运行。

如果运行上面的代码,我们会发现该函数并没有错误地阻塞。相反,我们会看到以下输出成功打印出来:

one
two
three
Enter fullscreen mode Exit fullscreen mode

既然我们已经了解了同步 Go 例程的正确方法,那就让我们来修复我们的错误吧!

修复漏洞

我们需要去掉第二个同步调用waitGroup.Wait()。这个调用会阻止从通道读取消息,进而阻止对的任何调用waitGroup.Done()。这就是我们函数阻塞的原因。

如果删除出错的那一行,剩下的就是:

 func MergeCOntributors(primaryAccount, secondaryAccount) error {
  // Create a WaitGroup to manage the goroutines.
    var waitGroup sync.WaitGroup
    c := make(chan error)

    // Perform 3 concurrent transactions against the database.
    waitGroup.Add(3)

    go func() {
        waitGroup.Wait()
        close(c)
    }()

    // Transaction #1, merge "commit" records
    go func() {
        defer waitGroup.Done()

        err := mergeCommits(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  // Transaction #2, merge "pull request" records
    go func() {
        defer waitGroup.Done()

        err := mergePullRequests(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  // Transaction #3, merge "merge" records
    go func() {
        defer waitGroup.Done()

        err := mergePullRequestMerges(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  // This line is bad! Get rid of it!
  // waitGroup.Wait()

  for err := range c {
        if err != nil {
            return err
        }
  }

  return markMerged(primaryAccount, secondaryAccount)
}
Enter fullscreen mode Exit fullscreen mode

现在,我们要确保以下行为:

  • 运行一个阻塞的 Go 例程,通过调用 `waitGroup` 函数waitGroup.Wait(),直到 WaitGroup 计数降至零,此时关闭通道。
  • 每个“DB事务”Go例程的匿名函数,如果出现错误,都会向通道发送一条消息。
  • 该呼叫系统range正在监听此类消息,并在消息到达时进行读取。
  • 在每个“数据库事务”Go例程中,该函数会被解除阻塞,调用waitGroup.Done()将会运行,从而使等待组的计数递减。
  • 当 WaitGroup 的计数达到零时,第一个 Go 例程将通过调用来解除阻塞并关闭通道。close(c)
  • 这将指示range调用停止监听通道,从而停止阻塞,允许主函数继续执行。

结论

一个放错位置waitGroup.Wait()就能造成多大的破坏啊!关键在于,当你向一个通道发送消息时,它会阻塞,直到该消息被读取。为了成功同步我们的 Go 例程,我们需要确保从所有写入的通道中读取数据。

在我们遇到的这个 bug 中,我们错误地阻塞了函数的执行,导致通道中无法读取任何消息。通过移除多余的阻塞waitGroup.Wait(),我们确保了发送到通道的消息都能被读取,从而使通道waitGroup计数器递减。这反过来又确保了通道会被关闭,解除对该通道消息的阻塞,并允许主函数继续执行。

这个 bug 确实让我对 Go 例程同步有了更深入的了解,希望它对你也有帮助。

祝您编程愉快!

文章来源:https://dev.to/sophiedebenedetto/synchronizing-go-routines-with-channels-and-waitgroups-3ke2