发布于 2026-01-06 3 阅读
0

使用 Go 构建微服务:RabbitMQ 的事件和后台作业

使用 Go 构建微服务:RabbitMQ 的事件和后台作业

原文发表于mariocarrion.com

什么是 RabbitMQ?

RabbitMQ是一款消息队列软件,也称为消息代理或队列管理器。它支持多种协议,例如AMQPMQTTSTOMP等。RabbitMQ 可用于长时间运行的任务(例如后台作业)以及不同服务之间的通信。

RabbitMQ 的工作原理是什么?

描述 RabbitMQ 最简单的类比就是邮局,它包含了从投递到最终送达邮件所需的所有步骤。在现实生活中,这些步骤包括:将邮件投入邮箱,然后在后台进行一些处理以规划邮件的路由,最后由邮递员将邮件送到目的地。

RabbitMQ 的工作原理是什么?1

RabbitMQ 的工作原理类似于一个队列,发布者可以在其中提交消息,这些消息最终会被多个消费者消费;然而,RabbitMQ 最有趣的地方在于它位于发布者和消费者之间的中间机制。这个中间机制被称为Exchange,它可以配置为定义绑定,从而允许将消息路由到不同的队列中,客户端可以通过不同的方式监听这些队列以消费消息,例如仅使用特定的键或通配符。

RabbitMQ 的工作原理是什么?2

使用存储库的发布者实现

本文使用的代码可在 Github 上找到

为了与 RabbitMQ 交互,我们将使用该软件包streadway/amqp,并且与其他数据存储类似,我们将定义一个存储库,该存储库将与实际的 RabbitMQ 发布者交互,并将在service类型中被调用。

此存储库类型将被命名为rabbitmq.Task ,它将包含一个未导出的字段,该字段引用 RabbitMQ 通道以及发出三个事件所需的相应方法CreatedDeletedUpdated

func (t *Task) Created(ctx context.Context, task internal.Task) error {
    return t.publish(ctx, "Task.Created", "tasks.event.created", task)
}

func (t *Task) Deleted(ctx context.Context, id string) error {
    return t.publish(ctx, "Task.Deleted", "tasks.event.deleted", id)
}

func (t *Task) Updated(ctx context.Context, task internal.Task) error {
    return t.publish(ctx, "Task.Updated", "tasks.event.updated", task)
}
Enter fullscreen mode Exit fullscreen mode

这三种方法都指向一个名为 `publish` 的未导出方法publish,该方法用于发布数据。该数据是使用该encoding/gob包对消息进行编码后的结果,类似于我们在讨论使用 Memcached 进行缓存时使用的代码:

func (t *Task) publish(ctx context.Context, spanName, routingKey string, e interface{}) error {
    // XXX: Excluding OpenTelemetry and error checking for simplicity
    var b bytes.Buffer

    _ = gob.NewEncoder(&b).Encode(e)

    _ = t.ch.Publish(
        "tasks",    // exchange
        routingKey, // routing key
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            AppId:       "tasks-rest-server",
            ContentType: "application/x-encoding-gob",
            Body:        b.Bytes(),
            Timestamp:   time.Now(),
        })

    return nil
}
Enter fullscreen mode Exit fullscreen mode

接下来,在service包中更新类型,使其能够使用接口类型接收该存储库的实例,然后在持久数据存储调用完成service.Task使用该实例,例如:

func (t *Task) Create(ctx context.Context, description string, priority internal.Priority, dates internal.Dates) (internal.Task, error) {
    // XXX: Excluding OpenTelemetry and error checking for simplicity
    task, _ := t.repo.Create(ctx, description, priority, dates)

    _ = t.msgBroker.Created(ctx, task)

    return task, nil
}
Enter fullscreen mode Exit fullscreen mode

请参考相关Delete通话Update记录了解更多详情,实际上代码与上面的代码类似。

现在,我们来看一下订阅者的实现。在这个例子中,我们将实现一个新的运行进程来负责消费这些数据。

订阅者实现

这个新流程将利用 RabbitMQ 事件来正确地索引任务记录,从而改变我们最初使用Elasticsearch 的方式,它还将支持像我们之前介绍的那样优雅地关闭

目前监听的代码是一个稍长的方法,其中最重要的部分是 RabbitMQ 返回的实际 Go 通道,这段代码执行类似以下的操作来接收所有事件:

// XXX: Excluding some things for simplicity, please refer to the original code
for msg := range msgs {
    switch msg.RoutingKey {
    case "tasks.event.updated", "tasks.event.created":
        // decode received Task event
        // call Elasticsearch to index record
    case "tasks.event.deleted":
        // decode received Task event
        // call Elasticsearch to delete record
    }
    // acknowledege received event
}
Enter fullscreen mode Exit fullscreen mode

在实际应用中,你应该考虑实现一个能够处理不同事件的服务器类型,或许可以像 Muxer 那样net/http.Server工作,并定义类似 Muxer 的东西,以便监听多个事件及其相应的编码/解码逻辑。

结论

RabbitMQ通常被称为分布式队列,但它也可以用作消息代理来通信多个服务,它是一个强大的工具,由于其可用的配置选项,可以大规模地向多个客户端传递消息。

推荐阅读

如果您想在 Kafka 和 Redis 中实现类似的功能,我建议您阅读以下链接:


本文包含由 Flaticon 的 itim2101 制作图标

文章来源:https://dev.to/mariocarrion/building-microservices-in-go-events-and-background-jobs-using-rabbitmq-2gpf