Golang:优雅关闭
我们先来了解一下什么是优雅关闭。一般来说,优雅关闭是指以可控且有序的方式关闭应用程序的过程。它包括采取必要的步骤,以确保所有正在进行的操作都已完成、资源已正确释放,并且在完全终止之前数据完整性得到维护。
如果您使用的是 Go 的默认 HTTP 库作为服务器,您可能已经见过一个名为 Shutdown 的函数。该函数已经处理了优雅终止 HTTP 服务器所需的所有操作。更具体地说,正如文档所述,“Shutdown 的工作原理是:首先关闭所有打开的监听器,然后关闭所有空闲连接,接着无限期地等待连接恢复到空闲状态,最后关闭服务器。”
你可能会想,那我为什么还要读这篇文章呢?让我们来看一个例子。
PS:我必须说明,这并非“最佳实践”代码示例,仅用于概念说明。因此,出于显而易见的原因,应用层、接口抽象等内容都被省略了。
package main
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"os/signal"
"syscall"
"github.com/gorilla/mux"
)
func main() {
userService := UserService{
db: db{},
amqp: amqp{},
}
r := mux.NewRouter()
r.HandleFunc("/user", func(rw http.ResponseWriter, req *http.Request) {
name := "some name" // let's imagine we got it from the request
if err := userService.RegisterUser(req.Context(), name); err != nil {
rw.WriteHeader(http.StatusInternalServerError)
return
}
rw.WriteHeader(http.StatusOK)
}).Methods(http.MethodPost)
srv := http.Server{}
srv.Addr = ":8080"
srv.Handler = r
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("listen and serve returned err: %v", err)
}
}()
<-ctx.Done()
log.Println("got interruption signal")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Printf("server shutdown returned an err: %v\n", err)
}
log.Println("final")
}
type UserService struct {
amqp amqp
db db
}
func (s *UserService) RegisterUser(ctx context.Context, name string) error {
log.Println("start user registration")
userID, err := s.db.InsertUser(ctx, name)
if err != nil {
return fmt.Errorf("db insertion failed: %v", err)
}
go s.amqp.PublishUserInserted(ctx, userID)
return nil
}
type db struct{}
func (d db) InsertUser(ctx context.Context, name string) (int, error) {
log.Println("user insert")
return 1, nil
}
type amqp struct{}
func (a amqp) PublishUserInserted(ctx context.Context, id int) {
log.Println("message publish")
}
这里我们有一个简单的服务器,模拟用户注册流程。HTTP 服务器具有优雅关闭功能,一切看起来都很正常。我们尝试发起一个请求,然后中断一个进程。
2023/07/09 22:35:50 start user registration
2023/07/09 22:35:50 user insert
2023/07/09 22:35:50 message publish
^C
2023/07/09 22:35:52 got interruption signal
2023/07/09 22:35:52 final
数据库和 AMQP 都已被调用,然后我们触发了服务器关闭。可以看到,信号处理正常,HTTP 服务器的关闭也已成功执行。我们的代码有问题吗?让我们修改几行代码。
func (d db) InsertUser(ctx context.Context, name string) (int, error) {
time.Sleep(time.Second * 10)
log.Println("user insert")
return 1, nil
}
func (a amqp) PublishUserInserted(ctx context.Context, id int) {
time.Sleep(time.Second * 20)
log.Println("message publish")
}
在这里,我们将尝试模拟数据库插入和 AMQP 发布需要相当长的时间的情况。我们预期在日志中看到什么?
2023/07/09 22:43:22 start user registration
^C
2023/07/09 22:43:25 got interruption signal
2023/07/09 22:43:32 user insert
2023/07/09 22:43:32 final
如您所见,我们已优雅地关闭了数据库插入操作,但 AMQP 发布操作却丢失了。这是为什么呢?因为 AMQP 发布操作是在 goroutine 中执行的,它不受 HTTP 服务器关闭机制的控制,所以需要我们自行处理。
实现方法有很多种,我们来看其中一种。首先,这意味着我们需要自己控制 goroutine 的执行。让我们在服务结构中添加一个 waitgroup。
type UserService struct {
amqp amqp
db db
doneWG sync.WaitGroup
}
我们将在注册端点触发发布功能时使用它。让我们把发布操作放在一个单独的函数中。首先,我们将使用最近添加的等待组来控制 goroutine 的执行流程。其次,我们将添加一个恢复函数,以防止意外的 panic 传播到 goroutine 外部。
func (s *UserService) publishUserInserted(ctx context.Context, userID int) {
s.doneWG.Add(1)
go func() {
defer s.doneWG.Done()
defer func() {
if err := recover(); err != nil {
log.Printf("publishUserInserted recovered panic: %v\n", err)
}
}()
s.amqp.PublishUserInserted(ctx, userID)
}()
}
最后,我们需要添加一个停止函数,用于控制 goroutine 的执行结束,并在主函数中使用它。此外,我们还会处理一个上下文完成通道,以便在有截止时间或需要取消时跳过等待。
func (s *UserService) Stop(ctx context.Context) {
log.Println("waiting for user service to finish")
doneChan := make(chan struct{})
go func() {
s.doneWG.Wait()
close(doneChan)
}()
select {
case <-ctx.Done():
log.Println("context was marked as done earlier, than user service has stopped")
case <-doneChan:
log.Println("user service finished")
}
}
完整的代码如下所示:
package main
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"os/signal"
"sync"
"syscall"
"time"
"github.com/gorilla/mux"
)
func main() {
userService := UserService{
db: db{},
amqp: amqp{},
}
r := mux.NewRouter()
r.HandleFunc("/user", func(rw http.ResponseWriter, req *http.Request) {
name := "some name" // let's imagine we got it from the request
if err := userService.RegisterUser(req.Context(), name); err != nil {
rw.WriteHeader(http.StatusInternalServerError)
return
}
rw.WriteHeader(http.StatusOK)
}).Methods(http.MethodPost)
srv := http.Server{}
srv.Addr = ":8080"
srv.Handler = r
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("listen and serve returned err: %v", err)
}
}()
<-ctx.Done()
log.Println("got interruption signal")
if err := srv.Shutdown(context.TODO()); err != nil { // Use here context with a required timeout
log.Printf("server shutdown returned an err: %v\n", err)
}
userService.Stop(context.TODO()) // Use here context with a required timeout
log.Println("final")
}
type UserService struct {
amqp amqp
db db
doneWG sync.WaitGroup
}
func (s *UserService) Stop(ctx context.Context) {
log.Println("waiting for user service to finish")
doneChan := make(chan struct{})
go func() {
s.doneWG.Wait()
close(doneChan)
}()
select {
case <-ctx.Done():
log.Println("context done earlier then user service has stopped")
case <-doneChan:
log.Println("user service finished")
}
}
func (s *UserService) RegisterUser(ctx context.Context, name string) error {
log.Println("start user registration")
userID, err := s.db.InsertUser(ctx, name)
if err != nil {
return fmt.Errorf("db insertion failed: %v", err)
}
s.publishUserInserted(ctx, userID)
return nil
}
func (s *UserService) publishUserInserted(ctx context.Context, userID int) {
s.doneWG.Add(1)
go func() {
defer s.doneWG.Done()
defer func() {
if err := recover(); err != nil {
log.Printf("publishUserInserted recovered panic: %v\n", err)
}
}()
s.amqp.PublishUserInserted(ctx, userID)
}()
}
type db struct{}
func (d db) InsertUser(ctx context.Context, name string) (int, error) {
time.Sleep(time.Second * 10)
log.Println("user insert")
return 1, nil
}
type amqp struct{}
func (a amqp) PublishUserInserted(ctx context.Context, id int) {
time.Sleep(time.Second * 20)
log.Println("message publish")
}
让我们尝试执行一下,看看问题是否解决了。
2023/07/09 22:51:48 start user registration
^C
2023/07/09 22:51:49 got interruption signal
2023/07/09 22:51:58 user insert
2023/07/09 22:51:58 waiting for user service to finish
2023/07/09 22:52:18 message publish
2023/07/09 22:52:18 user service finished
2023/07/09 22:52:18 final
确实,正如您从日志中看到的,我们现在已经等待发布完成。通过这种方法,您可以自行扩展对停止功能的控制,例如,通过传递包含相应超时时间的上下文来添加您希望等待关闭的超时时间。请记住,忽略优雅关闭可能会导致数据不一致和各种数据丢失,因此务必重视这一点。
文章来源:https://dev.to/antonkuklin/golang-graceful-shutdown-3n6d