使用 PostgreSQL、Kafka、MongoDB 和 ElasticSearch 实现事件溯源和 CQRS 👋✨💫
👨💻 已使用物品清单:
PostgreSQL作为事件存储数据库,
Kafka作为消息代理,
gRPC是 Go 语言实现的 gRPC ,
Jaeger 是一个开源的端到端分布式追踪工具,
Prometheus提供监控和告警功能,
Grafana用于构建包含 Prometheus 和 MongoDB 数据的可观测性仪表盘
, MongoDB 是一个数据库
,Elasticsearch是一个 Go 语言的 Elasticsearch 客户端。Echo
是一个Web 框架
,Kibana是一个用于 Elasticsearch 的数据可视化仪表盘软件,
Migrate用于数据迁移。
源代码可在GitHub 仓库中找到。
本项目的主要思路是使用 Go、PostgreSQL 和 Kafka 实现事件溯源和 CQRS,并使用 MongoDB 和 ElasticSearch 进行读取投影。之前我曾写过类似的文章,使用Go、EventStoreDB和 Spring实现
了相同的微服务。正如 之前所述,我认为 EventStoreDB 是事件溯源的最佳选择,但在实际项目中,我们通常会受到一些业务限制,例如 EventStoreDB 的使用可能不被允许。在这种情况下,PostgreSQL 和 Kafka 是实现自定义事件存储的良好替代方案。 如果您不熟悉事件溯源和 CQRS 模式,microservices.io 是最佳的学习资源。EventStoreDB网站的博客 和文档也非常出色, 强烈推荐Alexey Zimarev 的《.NET Core 领域驱动设计实战》一书。
本项目使用PostgreSQL和Kafka实现事件存储的微服务,并使用MongoDB和Elasticsearch
作为投影的读取数据库。 本文中的一些描述与之前重复,因为之前也有使用 PostgreSQL 和 Kafka 的实现,但思路相同。
所有用户界面都将通过以下端口提供:
Jaeger UI:http://localhost:16686
Prometheus 用户界面:http://localhost:9090
Grafana 用户界面:http://localhost:3005
此项目的 Docker Compose 文件:
version: "3.9"
services:
es_postgesql:
image: postgres:14.4
container_name: es_postgesql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=bank_accounts
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./microservices_pgdata:/var/lib/postgresql/data
networks: [ "microservices" ]
zookeeper:
image: 'bitnami/zookeeper:3.8.0'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
volumes:
- "./zookeeper:/zookeeper"
networks: [ "microservices" ]
kafka:
image: 'bitnami/kafka:3.2.0'
ports:
- "9092:9092"
- "9093:9093"
volumes:
- "./kafka_data:/bitnami"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
networks: [ "microservices" ]
mongodb:
image: mongo:latest
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: products
ports:
- "27017:27017"
volumes:
- ./mongodb_data_container:/data/db
networks: [ "microservices" ]
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.35
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "14250:14250"
- "9411:9411"
networks: [ "microservices" ]
node01:
image: docker.elastic.co/elasticsearch/elasticsearch:8.2.3
container_name: node01
restart: always
environment:
- node.name=node01
- cluster.name=es-cluster-8
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.license.self_generated.type=basic
- xpack.security.enabled=false
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es-data01:/usr/share/elasticsearch/data
ports:
- "9200:9200"
- "9300:9300"
networks: [ "microservices" ]
kibana:
image: docker.elastic.co/kibana/kibana:8.2.3
restart: always
environment:
ELASTICSEARCH_HOSTS: http://node01:9200
ports:
- "5601:5601"
depends_on:
- node01
networks: [ "microservices" ]
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3005:3000'
networks: [ "microservices" ]
volumes:
mongodb_data_container:
es-data01:
driver: local
networks:
microservices:
name: microservices
在事件溯源中,我们会存储实体发生的所有操作的历史记录,并从中推导出实体的状态。
我们可以回溯读取这些历史记录,从而确定实体在特定时间点的状态。
这是一种将数据作为事件存储在仅追加日志中的模式。
每个新事件都是一次变更。AggregateBase应该跟踪命令执行流程中发生的所有变更,以便 我们可以将这些变更持久化到命令处理程序中。
聚合会获取当前状态,验证特定操作的业务规则 ,并应用返回新状态的业务逻辑。此过程的关键在于“要么全部保存,要么全部不保存”。所有 聚合数据都必须成功保存。如果任何一条规则或操作失败,则整个状态变更将被拒绝。AggregateRoot 可以通过不同的方式实现,其主要方法是加载事件、应用和引发变更。 当我们从数据库中获取聚合时, 我们不会将其状态作为表或文档中的一条记录读取,而是读取之前保存的所有事件,并对每个事件调用when方法。 完成所有这些步骤后,我们将恢复给定聚合的所有历史记录。 通过这种方式,我们将聚合更新到其最新状态。
// AggregateRoot contains all methods of AggregateBase
type AggregateRoot interface {
GetID() string
SetID(id string) *AggregateBase
GetType() AggregateType
SetType(aggregateType AggregateType)
GetChanges() []any
ClearChanges()
GetVersion() uint64
ToSnapshot()
String() string
Load
Apply
RaiseEvent
}
// AggregateType type of the Aggregate
type AggregateType string
// AggregateBase base aggregate contains all main necessary fields
type AggregateBase struct {
ID string
Version uint64
Changes []any
Type AggregateType
when when
}
func NewAggregateBase(when when) *AggregateBase {
if when == nil {
return nil
}
return &AggregateBase{
Version: startVersion,
Changes: make([]any, 0, changesEventsCap),
when: when,
}
}
// ClearChanges clear AggregateBase uncommitted Event's
func (a *AggregateBase) ClearChanges() {
a.Changes = make([]any, 0, changesEventsCap)
}
// GetChanges get AggregateBase uncommitted Event's
func (a *AggregateBase) GetChanges() []any {
return a.Changes
}
// Load add existing events from event store to aggregate using When interface method
func (a *AggregateBase) Load(events []any) error {
for _, evt := range events {
if err := a.when(evt); err != nil {
return err
}
a.Version++
}
return nil
}
// Apply push event to aggregate uncommitted events using When method
func (a *AggregateBase) Apply(event any) error {
if err := a.when(event); err != nil {
return err
}
a.Version++
a.Changes = append(a.Changes, event)
return nil
}
// RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (a *AggregateBase) RaiseEvent(event any) error {
if err := a.when(event); err != nil {
return err
}
a.Version++
return nil
}
事件代表领域中发生的事实。它们是真理的来源,当前状态源自于事件。
事件是不可变的,代表着业务事实。
在事件溯源中,对聚合数据执行的每一次操作都应该产生一个新的事件。
事件代表领域中发生的事实。它们是真理的来源,当前状态源自于事件。事件
是不可变的,代表着业务事实。
这意味着我们永远不会更改或删除数据库中的任何内容,我们只会追加新的事件。
type Event struct {
EventID string
AggregateID string
EventType EventType
AggregateType AggregateType
Version uint64
Data []byte
Metadata []byte
Timestamp time.Time
}
快照代表了某个“时间点”的当前状态。
如果我们严格按照事件溯源模式来操作,就需要获取所有这些交易记录来计算当前账户余额。
这样做效率很低。为了提高效率,你可能首先想到的是将最新状态缓存起来。
与其检索所有这些事件,我们只需检索一条记录并将其用于业务逻辑即可。这就是快照。
其基本逻辑是:读取快照(如果存在),然后从事件存储中读取事件;
如果存在快照,则读取自上次创建快照的流修订版本以来的事件;否则,读取所有事件。
在我们的微服务中,我们存储了每 N 个事件的快照。
如果性能足够好,则可能不需要快照。
type Snapshot struct {
ID string `json:"id"`
Type AggregateType `json:"type"`
State []byte `json:"state"`
Version uint64 `json:"version"`
}
事件存储是系统的关键组成部分。域中发生的每一次变更都会被记录在数据库中。
它专门用于存储变更历史记录,状态由仅追加的事件日志表示。
事件是不可变的:它们不能被修改。AggregateStore
的实现包括 Load、Save 和 Exists 方法。Load
和 Save 方法接受聚合,然后使用 EventStoreDB 客户端加载或应用事件。Load
方法会查找聚合的流名称,从聚合流中读取所有事件,
遍历所有事件,并为每个事件调用 RaiseEvent 处理程序。Save
方法通过保存变更历史记录来持久化聚合,并处理并发。
当您从 EventStore 中检索流时,会记录当前版本号,
然后在保存时,可以确定在此期间是否有人修改了该记录。
// Load es.Aggregate events using snapshots with given frequency
func (p *pgEventStore) Load(ctx context.Context, aggregate Aggregate) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Load")
defer span.Finish()
span.LogFields(log.String("aggregate", aggregate.String()))
snapshot, err := p.GetSnapshot(ctx, aggregate.GetID())
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return tracing.TraceWithErr(span, err)
}
if snapshot != nil {
if err := serializer.Unmarshal(snapshot.State, aggregate); err != nil {
p.log.Errorf("(Load) serializer.Unmarshal err: %v", err)
return tracing.TraceWithErr(span, errors.Wrap(err, "json.Unmarshal"))
}
err := p.loadAggregateEventsByVersion(ctx, aggregate)
if err != nil {
return err
}
p.log.Debugf("(Load Aggregate By Version) aggregate: %s", aggregate.String())
span.LogFields(log.String("aggregate with events", aggregate.String()))
return nil
}
err = p.loadEvents(ctx, aggregate)
if err != nil {
return err
}
p.log.Debugf("(Load Aggregate): aggregate: %s", aggregate.String())
span.LogFields(log.String("aggregate with events", aggregate.String()))
return nil
}
// Save es.Aggregate events using snapshots with given frequency
func (p *pgEventStore) Save(ctx context.Context, aggregate Aggregate) (err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Save")
defer span.Finish()
span.LogFields(log.String("aggregate", aggregate.String()))
if len(aggregate.GetChanges()) == 0 {
p.log.Debug("(Save) aggregate.GetChanges()) == 0")
span.LogFields(log.Int("events", len(aggregate.GetChanges())))
return nil
}
tx, err := p.db.Begin(ctx)
if err != nil {
p.log.Errorf("(Save) db.Begin err: %v", err)
return tracing.TraceWithErr(span, errors.Wrap(err, "db.Begin"))
}
defer func() {
if tx != nil {
if txErr := tx.Rollback(ctx); txErr != nil && !errors.Is(txErr, pgx.ErrTxClosed) {
err = txErr
tracing.TraceErr(span, err)
return
}
}
}()
changes := aggregate.GetChanges()
events := make([]Event, 0, len(changes))
for i := range changes {
event, err := p.serializer.SerializeEvent(aggregate, changes[i])
if err != nil {
p.log.Errorf("(Save) serializer.SerializeEvent err: %v", err)
return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.SerializeEvent"))
}
events = append(events, event)
}
if err := p.saveEventsTx(ctx, tx, events); err != nil {
return tracing.TraceWithErr(span, errors.Wrap(err, "saveEventsTx"))
}
if aggregate.GetVersion()%p.cfg.SnapshotFrequency == 0 {
aggregate.ToSnapshot()
if err := p.saveSnapshotTx(ctx, tx, aggregate); err != nil {
return tracing.TraceWithErr(span, errors.Wrap(err, "saveSnapshotTx"))
}
}
if err := p.processEvents(ctx, events); err != nil {
return tracing.TraceWithErr(span, errors.Wrap(err, "processEvents"))
}
p.log.Debugf("(Save Aggregate): aggregate: %s", aggregate.String())
span.LogFields(log.String("aggregate with events", aggregate.String()))
return tx.Commit(ctx)
}
对于事件的序列化和反序列化,我们需要实现 Serializer 接口:
type Serializer interface {
SerializeEvent(aggregate Aggregate, event any) (Event, error)
DeserializeEvent(event Event) (any, error)
}
银行账户汇总的实现:
type eventSerializer struct {
}
func NewEventSerializer() *eventSerializer {
return &eventSerializer{}
}
func (s *eventSerializer) SerializeEvent(aggregate es.Aggregate, event any) (es.Event, error) {
eventBytes, err := serializer.Marshal(event)
if err != nil {
return es.Event{}, errors.Wrapf(err, "serializer.Marshal aggregateID: %s", aggregate.GetID())
}
switch evt := event.(type) {
case *events.BankAccountCreatedEventV1:
return es.NewEvent(aggregate, events.BankAccountCreatedEventType, eventBytes, evt.Metadata), nil
case *events.BalanceDepositedEventV1:
return es.NewEvent(aggregate, events.BalanceDepositedEventType, eventBytes, evt.Metadata), nil
case *events.BalanceWithdrawnEventV1:
return es.NewEvent(aggregate, events.BalanceWithdrawnEventType, eventBytes, evt.Metadata), nil
case *events.EmailChangedEventV1:
return es.NewEvent(aggregate, events.EmailChangedEventType, eventBytes, evt.Metadata), nil
default:
return es.Event{}, errors.Wrapf(ErrInvalidEvent, "aggregateID: %s, type: %T", aggregate.GetID(), event)
}
}
func (s *eventSerializer) DeserializeEvent(event es.Event) (any, error) {
switch event.GetEventType() {
case events.BankAccountCreatedEventType:
return deserializeEvent(event, new(events.BankAccountCreatedEventV1))
case events.BalanceDepositedEventType:
return deserializeEvent(event, new(events.BalanceDepositedEventV1))
case events.BalanceWithdrawnEventType:
return deserializeEvent(event, new(events.BalanceWithdrawnEventV1))
case events.EmailChangedEventType:
return deserializeEvent(event, new(events.EmailChangedEventV1))
default:
return nil, errors.Wrapf(ErrInvalidEvent, "type: %s", event.GetEventType())
}
}
接下来,我们来创建一个银行账户聚合:
const (
BankAccountAggregateType es.AggregateType = "BankAccount"
)
type BankAccountAggregate struct {
*es.AggregateBase
BankAccount *BankAccount
}
func NewBankAccountAggregate(id string) *BankAccountAggregate {
if id == "" {
return nil
}
bankAccountAggregate := &BankAccountAggregate{BankAccount: NewBankAccount(id)}
aggregateBase := es.NewAggregateBase(bankAccountAggregate.When)
aggregateBase.SetType(BankAccountAggregateType)
aggregateBase.SetID(id)
bankAccountAggregate.AggregateBase = aggregateBase
return bankAccountAggregate
}
func (a *BankAccountAggregate) When(event any) error {
switch evt := event.(type) {
case *events.BankAccountCreatedEventV1:
a.BankAccount.Email = evt.Email
a.BankAccount.Address = evt.Address
a.BankAccount.Balance = evt.Balance
a.BankAccount.FirstName = evt.FirstName
a.BankAccount.LastName = evt.LastName
a.BankAccount.Status = evt.Status
return nil
case *events.BalanceDepositedEventV1:
return a.BankAccount.DepositBalance(evt.Amount)
case *events.BalanceWithdrawnEventV1:
return a.BankAccount.WithdrawBalance(evt.Amount)
case *events.EmailChangedEventV1:
a.BankAccount.Email = evt.Email
return nil
default:
return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "event: %#v", event)
}
}
func (a *BankAccountAggregate) CreateNewBankAccount(ctx context.Context, email, address, firstName, lastName, status string, amount int64) error {
span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.CreateNewBankAccount")
defer span.Finish()
span.LogFields(log.String("AggregateID", a.GetID()))
if amount < 0 {
return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
}
metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
if err != nil {
return errors.Wrap(err, "serializer.Marshal")
}
event := &events.BankAccountCreatedEventV1{
Email: email,
Address: address,
FirstName: firstName,
LastName: lastName,
Balance: money.New(amount, money.USD),
Status: status,
Metadata: metaDataBytes,
}
return a.Apply(event)
}
func (a *BankAccountAggregate) DepositBalance(ctx context.Context, amount int64, paymentID string) error {
span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.DepositBalance")
defer span.Finish()
span.LogFields(log.String("AggregateID", a.GetID()))
if amount <= 0 {
return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
}
metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
if err != nil {
return errors.Wrap(err, "serializer.Marshal")
}
event := &events.BalanceDepositedEventV1{
Amount: amount,
PaymentID: paymentID,
Metadata: metaDataBytes,
}
return a.Apply(event)
}
func (a *BankAccountAggregate) WithdrawBalance(ctx context.Context, amount int64, paymentID string) error {
span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.WithdrawBalance")
defer span.Finish()
span.LogFields(log.String("AggregateID", a.GetID()))
if amount <= 0 {
return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
}
balance, err := money.New(a.BankAccount.Balance.Amount(), money.USD).Subtract(money.New(amount, money.USD))
if err != nil {
return errors.Wrapf(err, "Balance.Subtract amount: %d", amount)
}
if balance.IsNegative() {
return errors.Wrapf(bankAccountErrors.ErrNotEnoughBalance, "amount: %d", amount)
}
metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
if err != nil {
return errors.Wrap(err, "serializer.Marshal")
}
event := &events.BalanceWithdrawnEventV1{
Amount: amount,
PaymentID: paymentID,
Metadata: metaDataBytes,
}
return a.Apply(event)
}
func (a *BankAccountAggregate) ChangeEmail(ctx context.Context, email string) error {
span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.ChangeEmail")
defer span.Finish()
span.LogFields(log.String("AggregateID", a.GetID()))
metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
if err != nil {
return errors.Wrap(err, "serializer.Marshal")
}
event := &events.EmailChangedEventV1{Email: email, Metadata: metaDataBytes}
return a.Apply(event)
}
我们的微服务接受 HTTP 和 gRPC 请求:
银行账户 REST 控制器接受请求,使用验证器验证请求,
然后调用命令或查询服务。CQRS
之所以流行,主要原因在于它能够
分别处理读取和写入操作,因为这两种操作的优化技术差异巨大。Go
的 http 框架使用了echo。
HTTP 处理程序:
func (h *bankAccountHandlers) CreateBankAccount() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.CreateBankAccount")
defer span.Finish()
h.metrics.HttpCreateBankAccountRequests.Inc()
var command commands.CreateBankAccountCommand
if err := c.Bind(&command); err != nil {
h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
command.AggregateID = uuid.NewV4().String()
if err := h.validate.StructCtx(ctx, command); err != nil {
h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
err := h.bankAccountService.Commands.CreateBankAccount.Handle(ctx, command)
if err != nil {
h.log.Errorf("(CreateBankAccount.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.log.Infof("(BankAccount created) id: %s", command.AggregateID)
return c.JSON(http.StatusCreated, command.AggregateID)
}
}
func (h *bankAccountHandlers) DepositBalance() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.DepositBalance")
defer span.Finish()
h.metrics.HttpDepositBalanceRequests.Inc()
var command commands.DepositBalanceCommand
if err := c.Bind(&command); err != nil {
h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
command.AggregateID = c.Param(constants.ID)
if err := h.validate.StructCtx(ctx, command); err != nil {
h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
err := h.bankAccountService.Commands.DepositBalance.Handle(ctx, command)
if err != nil {
h.log.Errorf("(DepositBalance.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.log.Infof("(balance deposited) id: %s, amount: %d", command.AggregateID)
return c.NoContent(http.StatusOK)
}
}
func (h *bankAccountHandlers) WithdrawBalance() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.WithdrawBalance")
defer span.Finish()
h.metrics.HttpWithdrawBalanceRequests.Inc()
var command commands.WithdrawBalanceCommand
if err := c.Bind(&command); err != nil {
h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
command.AggregateID = c.Param(constants.ID)
if err := h.validate.StructCtx(ctx, command); err != nil {
h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
err := h.bankAccountService.Commands.WithdrawBalance.Handle(ctx, command)
if err != nil {
h.log.Errorf("(WithdrawBalance.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.log.Infof("(balance withdraw) id: %s, amount: %d", command.AggregateID)
return c.NoContent(http.StatusOK)
}
}
func (h *bankAccountHandlers) ChangeEmail() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.WithdrawBalance")
defer span.Finish()
h.metrics.HttpChangeEmailRequests.Inc()
var command commands.ChangeEmailCommand
if err := c.Bind(&command); err != nil {
h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
command.AggregateID = c.Param(constants.ID)
if err := h.validate.StructCtx(ctx, command); err != nil {
h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
err := h.bankAccountService.Commands.ChangeEmail.Handle(ctx, command)
if err != nil {
h.log.Errorf("(ChangeEmail.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.log.Infof("(balance withdraw) id: %s, amount: %d", command.AggregateID)
return c.NoContent(http.StatusOK)
}
}
func (h *bankAccountHandlers) GetByID() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.GetByID")
defer span.Finish()
h.metrics.HttpGetBuIdRequests.Inc()
var query queries.GetBankAccountByIDQuery
if err := c.Bind(&query); err != nil {
h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
query.AggregateID = c.Param(constants.ID)
fromStore := c.QueryParam("store")
if fromStore != "" {
isFromStore, err := strconv.ParseBool(fromStore)
if err != nil {
h.log.Errorf("strconv.ParseBool err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
query.FromEventStore = isFromStore
}
if err := h.validate.StructCtx(ctx, query); err != nil {
h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
bankAccountProjection, err := h.bankAccountService.Queries.GetBankAccountByID.Handle(ctx, query)
if err != nil {
h.log.Errorf("(ChangeEmail.Handle) id: %s, err: %v", query.AggregateID, tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.log.Infof("(get bank account) id: %s", bankAccountProjection.AggregateID)
return c.JSON(http.StatusOK, mappers.BankAccountMongoProjectionToHttp(bankAccountProjection))
}
}
func (h *bankAccountHandlers) Search() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.Search")
defer span.Finish()
h.metrics.HttpSearchRequests.Inc()
var query queries.SearchBankAccountsQuery
if err := c.Bind(&query); err != nil {
h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
query.QueryTerm = c.QueryParam("search")
query.Pagination = utils.NewPaginationFromQueryParams(c.QueryParam(constants.Size), c.QueryParam(constants.Page))
if err := h.validate.StructCtx(ctx, query); err != nil {
h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
searchResult, err := h.bankAccountService.Queries.SearchBankAccounts.Handle(ctx, query)
if err != nil {
h.log.Errorf("(SearchBankAccounts.Handle) id: %s, err: %v", query.QueryTerm, tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
response := mappers.SearchResultToHttp(searchResult.List, searchResult.PaginationResponse)
h.log.Infof("(search) result: %+v", response)
return c.JSON(http.StatusOK, response)
}
}
推荐使用bloomrpc,它是一款优秀的 gRPC 服务图形用户界面客户端。gRPC
服务处理程序:
func (g *grpcService) CreateBankAccount(ctx context.Context, request *bankAccountService.CreateBankAccountRequest) (*bankAccountService.CreateBankAccountResponse, error) {
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.CreateBankAccount")
defer span.Finish()
span.LogFields(log.String("req", request.String()))
g.metrics.GrpcCreateBankAccountRequests.Inc()
aggregateID := uuid.NewV4().String()
command := commands.CreateBankAccountCommand{
AggregateID: aggregateID,
Email: request.GetEmail(),
Address: request.GetAddress(),
FirstName: request.GetFirstName(),
LastName: request.GetLastName(),
Balance: request.GetBalance(),
Status: request.GetStatus(),
}
if err := g.validate.StructCtx(ctx, command); err != nil {
g.log.Errorf("validation err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
err := g.bankAccountService.Commands.CreateBankAccount.Handle(ctx, command)
if err != nil {
g.log.Errorf("(CreateBankAccount.Handle) err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
g.log.Infof("(grpcService) [created account] aggregateID: %s", aggregateID)
return &bankAccountService.CreateBankAccountResponse{Id: aggregateID}, nil
}
func (g *grpcService) DepositBalance(ctx context.Context, request *bankAccountService.DepositBalanceRequest) (*bankAccountService.DepositBalanceResponse, error) {
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.DepositBalance")
defer span.Finish()
span.LogFields(log.String("req", request.String()))
g.metrics.GrpcDepositBalanceRequests.Inc()
command := commands.DepositBalanceCommand{
AggregateID: request.GetId(),
Amount: request.GetAmount(),
PaymentID: request.GetPaymentId(),
}
if err := g.validate.StructCtx(ctx, command); err != nil {
g.log.Errorf("validation err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
err := g.bankAccountService.Commands.DepositBalance.Handle(ctx, command)
if err != nil {
g.log.Errorf("(DepositBalance.Handle) err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
g.log.Infof("(grpcService) [deposited balance] aggregateID: %s, amount: %v", request.GetId(), request.GetAmount())
return new(bankAccountService.DepositBalanceResponse), nil
}
func (g *grpcService) WithdrawBalance(ctx context.Context, request *bankAccountService.WithdrawBalanceRequest) (*bankAccountService.WithdrawBalanceResponse, error) {
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.WithdrawBalance")
defer span.Finish()
span.LogFields(log.String("req", request.String()))
g.metrics.GrpcWithdrawBalanceRequests.Inc()
command := commands.WithdrawBalanceCommand{
AggregateID: request.GetId(),
Amount: request.GetAmount(),
PaymentID: request.GetPaymentId(),
}
if err := g.validate.StructCtx(ctx, command); err != nil {
g.log.Errorf("validation err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
err := g.bankAccountService.Commands.WithdrawBalance.Handle(ctx, command)
if err != nil {
g.log.Errorf("(WithdrawBalance.Handle) err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
g.log.Infof("(grpcService) [withdraw balance] aggregateID: %s, amount: %v", request.GetId(), request.GetAmount())
return new(bankAccountService.WithdrawBalanceResponse), nil
}
func (g *grpcService) ChangeEmail(ctx context.Context, request *bankAccountService.ChangeEmailRequest) (*bankAccountService.ChangeEmailResponse, error) {
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.ChangeEmail")
defer span.Finish()
span.LogFields(log.String("req", request.String()))
g.metrics.GrpcChangeEmailRequests.Inc()
command := commands.ChangeEmailCommand{AggregateID: request.GetId(), NewEmail: request.GetEmail()}
if err := g.validate.StructCtx(ctx, command); err != nil {
g.log.Errorf("validation err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
err := g.bankAccountService.Commands.ChangeEmail.Handle(ctx, command)
if err != nil {
g.log.Errorf("(ChangeEmail.Handle) err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
g.log.Infof("(grpcService) [changed email] aggregateID: %s, newEmail: %s", request.GetId(), request.GetEmail())
return new(bankAccountService.ChangeEmailResponse), nil
}
func (g *grpcService) GetById(ctx context.Context, request *bankAccountService.GetByIdRequest) (*bankAccountService.GetByIdResponse, error) {
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetById")
defer span.Finish()
span.LogFields(log.String("req", request.String()))
g.metrics.GrpcGetBuIdRequests.Inc()
query := queries.GetBankAccountByIDQuery{AggregateID: request.GetId(), FromEventStore: request.IsOwner}
if err := g.validate.StructCtx(ctx, query); err != nil {
g.log.Errorf("validation err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
bankAccountProjection, err := g.bankAccountService.Queries.GetBankAccountByID.Handle(ctx, query)
if err != nil {
g.log.Errorf("(GetBankAccountByID.Handle) err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
g.log.Infof("(grpcService) [get account by id] projection: %+v", bankAccountProjection)
return &bankAccountService.GetByIdResponse{BankAccount: mappers.BankAccountMongoProjectionToProto(bankAccountProjection)}, nil
}
func (g *grpcService) SearchBankAccounts(ctx context.Context, request *bankAccountService.SearchBankAccountsRequest) (*bankAccountService.SearchBankAccountsResponse, error) {
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.SearchBankAccounts")
defer span.Finish()
span.LogFields(log.String("req", request.String()))
g.metrics.GrpcSearchRequests.Inc()
query := queries.SearchBankAccountsQuery{
QueryTerm: request.GetSearchText(),
Pagination: &utils.Pagination{
Size: int(request.GetSize()),
Page: int(request.GetPage()),
},
}
if err := g.validate.StructCtx(ctx, query); err != nil {
g.log.Errorf("validation err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
searchQueryResult, err := g.bankAccountService.Queries.SearchBankAccounts.Handle(ctx, query)
if err != nil {
g.log.Errorf("(SearchBankAccounts.Handle) err: %v", err)
return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
}
g.log.Infof("(grpcService) [search] result: %+vv", searchQueryResult.PaginationResponse)
return &bankAccountService.SearchBankAccountsResponse{
BankAccounts: mappers.SearchBankAccountsListToProto(searchQueryResult.List),
Pagination: mappers.PaginationResponseToProto(searchQueryResult.PaginationResponse),
}, nil
}
命令的主要属性在于,当命令成功执行时,系统会转换到新的状态。
命令处理程序负责处理命令、修改状态或执行其他副作用。
命令服务处理 CQRS 命令,从事件存储中加载聚合数据,并根据应用程序的业务逻辑调用其方法。
聚合数据应用这些更改,然后将这些事件更改列表保存到事件存储中。
这里创建银行账户的命令很简单,但在现实世界中,业务逻辑当然要复杂得多,我们必须检查电子邮件是否可用等等。
type CreateBankAccountCommand struct {
AggregateID string `json:"id" validate:"required,gte=0"`
Email string `json:"email" validate:"required,gte=0,email"`
Address string `json:"address" validate:"required,gte=0"`
FirstName string `json:"firstName" validate:"required,gte=0"`
LastName string `json:"lastName" validate:"required,gte=0"`
Balance int64 `json:"balance" validate:"gte=0"`
Status string `json:"status"`
}
type CreateBankAccount interface {
Handle(ctx context.Context, cmd CreateBankAccountCommand) error
}
type createBankAccountCmdHandler struct {
log logger.Logger
aggregateStore es.AggregateStore
}
func NewCreateBankAccountCmdHandler(log logger.Logger, aggregateStore es.AggregateStore) *createBankAccountCmdHandler {
return &createBankAccountCmdHandler{log: log, aggregateStore: aggregateStore}
}
func (c *createBankAccountCmdHandler) Handle(ctx context.Context, cmd CreateBankAccountCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createBankAccountCmdHandler.Handle")
defer span.Finish()
span.LogFields(log.Object("command", cmd))
exists, err := c.aggregateStore.Exists(ctx, cmd.AggregateID)
if err != nil {
return tracing.TraceWithErr(span, err)
}
if exists {
return tracing.TraceWithErr(span, errors.New("already exists"))
}
bankAccountAggregate := domain.NewBankAccountAggregate(cmd.AggregateID)
err = bankAccountAggregate.CreateNewBankAccount(ctx, cmd.Email, cmd.Address, cmd.FirstName, cmd.LastName, cmd.Status, cmd.Balance)
if err != nil {
return tracing.TraceWithErr(span, err)
}
return c.aggregateStore.Save(ctx, bankAccountAggregate)
}
在事件溯源中,投影(也称为视图模型或查询模型)提供了底层基于事件的数据模型的视图。
它们通常代表将源写入模型转换为读取模型的逻辑。
其核心思想是,投影会接收所有能够投影的事件,并
使用读取模型数据库提供的常规 CRUD 操作,对其控制的读取模型执行常规的 CRUD 操作。
投影不仅限于处理单个实体的事件,还可以为多个实体(甚至是不同类型的实体)收集和聚合数据。
事件存储中附加的事件会触发创建或更新读取模型的投影逻辑。
我们可以订阅订单类型流事件的投影。
当我们执行命令时,聚合会生成一个新事件,表示聚合的状态转换。
这些事件会被提交到存储中,因此存储会将它们附加到聚合流的末尾。
投影接收这些事件,并使用 `When` 方法更新其读取模型。与聚合类似,它会根据事件类型应用相应的更改。
func (s *mongoSubscription) ProcessMessagesErrGroup(ctx context.Context, r *kafka.Reader, workerID int) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
m, err := r.FetchMessage(ctx)
if err != nil {
s.log.Warnf("(mongoSubscription) workerID: %d, err: %v", workerID, err)
continue
}
switch m.Topic {
case es.GetTopicName(s.cfg.KafkaPublisherConfig.TopicPrefix, string(domain.BankAccountAggregateType)):
s.handleBankAccountEvents(ctx, r, m)
}
}
}
func (s *mongoSubscription) handleBankAccountEvents(ctx context.Context, r *kafka.Reader, m kafka.Message) {
ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "mongoSubscription.handleBankAccountEvents")
defer span.Finish()
var events []es.Event
if err := serializer.Unmarshal(m.Value, &events); err != nil {
s.log.Errorf("serializer.Unmarshal: %v", tracing.TraceWithErr(span, err))
s.commitErrMessage(ctx, r, m)
return
}
for _, event := range events {
if err := s.handle(ctx, r, m, event); err != nil {
return
}
}
s.commitMessage(ctx, r, m)
}
func (s *mongoSubscription) handle(ctx context.Context, r *kafka.Reader, m kafka.Message, event es.Event) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoSubscription.handle")
defer span.Finish()
err := s.projection.When(ctx, event)
if err != nil {
s.log.Errorf("MongoSubscription When err: %v", err)
recreateErr := s.recreateProjection(ctx, event)
if recreateErr != nil {
return tracing.TraceWithErr(span, errors.Wrapf(recreateErr, "recreateProjection err: %v", err))
}
s.commitErrMessage(ctx, r, m)
return tracing.TraceWithErr(span, errors.Wrapf(err, "When type: %s, aggregateID: %s", event.GetEventType(), event.GetAggregateID()))
}
s.log.Infof("MongoSubscription <<<commit>>> event: %s", event.String())
return nil
}

MongoDB投影通过实现When方法来处理事件,
处理事件并应用诸如聚合之类的更改,然后使用MongoDB存储库保存它:
type bankAccountMongoProjection struct {
log logger.Logger
cfg *config.Config
serializer es.Serializer
mongoRepository domain.MongoRepository
}
func NewBankAccountMongoProjection(
log logger.Logger,
cfg *config.Config,
serializer es.Serializer,
mongoRepository domain.MongoRepository,
) *bankAccountMongoProjection {
return &bankAccountMongoProjection{log: log, cfg: cfg, serializer: serializer, mongoRepository: mongoRepository}
}
func (b *bankAccountMongoProjection) When(ctx context.Context, esEvent es.Event) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.When")
defer span.Finish()
deserializedEvent, err := b.serializer.DeserializeEvent(esEvent)
if err != nil {
return errors.Wrapf(err, "serializer.DeserializeEvent aggregateID: %s, type: %s", esEvent.GetAggregateID(), esEvent.GetEventType())
}
switch event := deserializedEvent.(type) {
case *events.BankAccountCreatedEventV1:
return b.onBankAccountCreated(ctx, esEvent, event)
case *events.BalanceDepositedEventV1:
return b.onBalanceDeposited(ctx, esEvent, event)
case *events.BalanceWithdrawnEventV1:
return b.onBalanceWithdrawn(ctx, esEvent, event)
case *events.EmailChangedEventV1:
return b.onEmailChanged(ctx, esEvent, event)
default:
return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "esEvent: %s", esEvent.String())
}
}
func (b *bankAccountMongoProjection) onBankAccountCreated(ctx context.Context, esEvent es.Event, event *events.BankAccountCreatedEventV1) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBankAccountCreated")
defer span.Finish()
span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))
if esEvent.GetVersion() != 1 {
return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, version: %d", esEvent.GetEventType(), esEvent.GetVersion())
}
projection := &domain.BankAccountMongoProjection{
AggregateID: esEvent.GetAggregateID(),
Version: esEvent.GetVersion(),
Email: event.Email,
Address: event.Address,
FirstName: event.FirstName,
LastName: event.LastName,
Balance: domain.Balance{
Amount: event.Balance.AsMajorUnits(),
Currency: event.Balance.Currency().Code,
},
Status: event.Status,
UpdatedAt: time.Now().UTC(),
CreatedAt: time.Now().UTC(),
}
err := b.mongoRepository.Insert(ctx, projection)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBankAccountCreated] mongoRepository.Insert aggregateID: %s", esEvent.GetAggregateID()))
}
b.log.Infof("[onBankAccountCreated] projection: %#v", projection)
return nil
}
func (b *bankAccountMongoProjection) onBalanceDeposited(ctx context.Context, esEvent es.Event, event *events.BalanceDepositedEventV1) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBalanceDeposited")
defer span.Finish()
span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))
if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
projection.Balance.Amount += money.New(event.Amount, money.USD).AsMajorUnits()
projection.Version = esEvent.GetVersion()
return projection
}, esEvent.GetVersion()-1); err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
}
b.log.Infof("[onBalanceDeposited] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
return nil
}
func (b *bankAccountMongoProjection) onBalanceWithdrawn(ctx context.Context, esEvent es.Event, event *events.BalanceWithdrawnEventV1) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBalanceWithdrawn")
defer span.Finish()
span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))
if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
projection.Balance.Amount -= money.New(event.Amount, money.USD).AsMajorUnits()
projection.Version = esEvent.GetVersion()
return projection
}, esEvent.GetVersion()-1); err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
}
b.log.Infof("[onBalanceWithdrawn] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
return nil
}
func (b *bankAccountMongoProjection) onEmailChanged(ctx context.Context, esEvent es.Event, event *events.EmailChangedEventV1) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onEmailChanged")
defer span.Finish()
span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))
if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
projection.Email = event.Email
projection.Version = esEvent.GetVersion()
return projection
}, esEvent.GetVersion()-1); err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
}
b.log.Infof("[onEmailChanged] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
return nil
}
ElasticSearch 投影也做同样的事情,即为文档建立索引以便搜索:
type elasticProjection struct {
log logger.Logger
cfg *config.Config
serializer es.Serializer
elasticSearchRepo domain.ElasticSearchRepository
}
func NewElasticProjection(log logger.Logger, cfg *config.Config, serializer es.Serializer, elasticSearchRepo domain.ElasticSearchRepository) *elasticProjection {
return &elasticProjection{log: log, cfg: cfg, serializer: serializer, elasticSearchRepo: elasticSearchRepo}
}
func (e *elasticProjection) When(ctx context.Context, esEvent es.Event) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.When")
defer span.Finish()
deserializedEvent, err := e.serializer.DeserializeEvent(esEvent)
if err != nil {
return errors.Wrapf(err, "serializer.DeserializeEvent aggregateID: %s, type: %s", esEvent.GetAggregateID(), esEvent.GetEventType())
}
switch event := deserializedEvent.(type) {
case *events.BankAccountCreatedEventV1:
return e.onBankAccountCreated(ctx, esEvent, event)
case *events.BalanceDepositedEventV1:
return e.onBalanceDeposited(ctx, esEvent, event)
case *events.BalanceWithdrawnEventV1:
return e.onBalanceWithdrawn(ctx, esEvent, event)
case *events.EmailChangedEventV1:
return e.onEmailChanged(ctx, esEvent, event)
default:
return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "esEvent: %s", esEvent.String())
}
}
func (e *elasticProjection) onBankAccountCreated(ctx context.Context, esEvent es.Event, event *events.BankAccountCreatedEventV1) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBankAccountCreated")
defer span.Finish()
span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))
if esEvent.GetVersion() != 1 {
return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, version: %d", esEvent.GetEventType(), esEvent.GetVersion())
}
projection := &domain.ElasticSearchProjection{
ID: esEvent.GetAggregateID(),
AggregateID: esEvent.GetAggregateID(),
Version: esEvent.GetVersion(),
Email: event.Email,
Address: event.Address,
FirstName: event.FirstName,
LastName: event.LastName,
Balance: domain.Balance{
Amount: event.Balance.AsMajorUnits(),
Currency: event.Balance.Currency().Code,
},
Status: event.Status,
UpdatedAt: time.Now().UTC(),
CreatedAt: time.Now().UTC(),
}
err := e.elasticSearchRepo.Index(ctx, projection)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] elasticSearchRepo.Index aggregateID: %s", esEvent.GetAggregateID()))
}
e.log.Infof("ElasticSearch when [onBankAccountCreated] projection: %s", projection)
return nil
}
func (e *elasticProjection) onBalanceDeposited(ctx context.Context, esEvent es.Event, event *events.BalanceDepositedEventV1) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBalanceDeposited")
defer span.Finish()
span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))
projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
}
if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
return tracing.TraceWithErr(span, err)
}
projection.Balance.Amount += money.New(event.Amount, money.USD).AsMajorUnits()
projection.Version = esEvent.GetVersion()
if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
}
e.log.Infof("ElasticSearch when [onBalanceDeposited] projection: %s", projection)
return nil
}
func (e *elasticProjection) onBalanceWithdrawn(ctx context.Context, esEvent es.Event, event *events.BalanceWithdrawnEventV1) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBalanceWithdrawn")
defer span.Finish()
span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))
projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
}
if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
return tracing.TraceWithErr(span, err)
}
projection.Balance.Amount -= money.New(event.Amount, money.USD).AsMajorUnits()
projection.Version = esEvent.GetVersion()
if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
}
e.log.Infof("ElasticSearch when [onBalanceWithdrawn] projection: %s", projection)
return nil
}
func (e *elasticProjection) onEmailChanged(ctx context.Context, esEvent es.Event, event *events.EmailChangedEventV1) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onEmailChanged")
defer span.Finish()
span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))
projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
}
if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
return tracing.TraceWithErr(span, err)
}
projection.Email = event.Email
projection.Version = esEvent.GetVersion()
if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
}
e.log.Infof("ElasticSearch when [onEmailChanged] projection: %s", projection)
return nil
}
func (e *elasticProjection) validateEventVersion(version uint64, esEvent es.Event) error {
if version != esEvent.GetVersion()-1 {
return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, eventVersion: %d, projectionVersion: %d", esEvent.GetEventType(), esEvent.GetVersion(), version)
}
return nil
}
在 CQRS 中,查询代表获取数据的意图,并负责返回请求查询的结果。
读取模型可以源自写入模型,但并非必须如此。
它是将业务操作结果转换为可读形式的过程。
事件溯源系统的一大优势在于能够随时创建新的读取模型,
而不会影响其他任何内容。
然后,我们可以使用以下查询来检索投影数据:
通过 ID 获取银行账户查询可以从 MongoDB 或聚合存储中加载(如果需要):
type GetBankAccountByIDQuery struct {
AggregateID string `json:"aggregateID" validate:"required,gte=0"`
FromEventStore bool `json:"fromEventStore"`
}
type GetBankAccountByID interface {
Handle(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error)
}
type getBankAccountByIDQuery struct {
log logger.Logger
aggregateStore es.AggregateStore
mongoRepository domain.MongoRepository
}
func NewGetBankAccountByIDQuery(log logger.Logger, aggregateStore es.AggregateStore, mongoRepository domain.MongoRepository) *getBankAccountByIDQuery {
return &getBankAccountByIDQuery{log: log, aggregateStore: aggregateStore, mongoRepository: mongoRepository}
}
func (q *getBankAccountByIDQuery) Handle(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "getBankAccountByIDQuery.Handle")
defer span.Finish()
span.LogFields(log.Object("query", query))
if query.FromEventStore {
return q.loadFromAggregateStore(ctx, query)
}
projection, err := q.mongoRepository.GetByAggregateID(ctx, query.AggregateID)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
bankAccountAggregate := domain.NewBankAccountAggregate(query.AggregateID)
if err = q.aggregateStore.Load(ctx, bankAccountAggregate); err != nil {
return nil, tracing.TraceWithErr(span, err)
}
if bankAccountAggregate.GetVersion() == 0 {
return nil, tracing.TraceWithErr(span, errors.Wrapf(bankAccountErrors.ErrBankAccountNotFound, "id: %s", query.AggregateID))
}
mongoProjection := mappers.BankAccountToMongoProjection(bankAccountAggregate)
err = q.mongoRepository.Upsert(ctx, mongoProjection)
if err != nil {
q.log.Errorf("(GetBankAccountByIDQuery) mongo Upsert AggregateID: %s, err: %v", query.AggregateID, tracing.TraceWithErr(span, err))
}
q.log.Debugf("(GetBankAccountByIDQuery) Upsert %+v", query)
return mongoProjection, nil
}
return nil, tracing.TraceWithErr(span, err)
}
q.log.Debugf("(GetBankAccountByIDQuery) from mongo %+v", query)
return projection, nil
}
func (q *getBankAccountByIDQuery) loadFromAggregateStore(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "getBankAccountByIDQuery.loadFromAggregateStore")
defer span.Finish()
bankAccountAggregate := domain.NewBankAccountAggregate(query.AggregateID)
if err := q.aggregateStore.Load(ctx, bankAccountAggregate); err != nil {
return nil, tracing.TraceWithErr(span, err)
}
if bankAccountAggregate.GetVersion() == 0 {
return nil, tracing.TraceWithErr(span, errors.Wrapf(bankAccountErrors.ErrBankAccountNotFound, "id: %s", query.AggregateID))
}
q.log.Debugf("(GetBankAccountByIDQuery) from aggregateStore bankAccountAggregate: %+v", bankAccountAggregate.BankAccount)
return mappers.BankAccountToMongoProjection(bankAccountAggregate), nil
}
使用官方MongoDB 客户端的银行账户 MongoDB 存储库方法:
type bankAccountMongoRepository struct {
log logger.Logger
cfg *config.Config
db *mongo.Client
}
func NewBankAccountMongoRepository(log logger.Logger, cfg *config.Config, db *mongo.Client) *bankAccountMongoRepository {
return &bankAccountMongoRepository{log: log, cfg: cfg, db: db}
}
func (b *bankAccountMongoRepository) Insert(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Insert")
defer span.Finish()
span.LogFields(log.String("aggregateID", projection.AggregateID))
_, err := b.bankAccountsCollection().InsertOne(ctx, projection)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[InsertOne] AggregateID: %s", projection.AggregateID))
}
b.log.Debugf("[Insert] result AggregateID: %s", projection.AggregateID)
return nil
}
func (b *bankAccountMongoRepository) Update(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
defer span.Finish()
span.LogFields(log.String("aggregateID", projection.AggregateID))
projection.ID = ""
projection.UpdatedAt = time.Now().UTC()
ops := options.FindOneAndUpdate()
ops.SetReturnDocument(options.After)
ops.SetUpsert(false)
filter := bson.M{constants.MongoAggregateID: projection.AggregateID}
err := b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": projection}, ops).Decode(projection)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOneAndUpdate] aggregateID: %s", projection.AggregateID))
}
b.log.Debugf("[Update] result AggregateID: %s", projection.AggregateID)
return nil
}
func (b *bankAccountMongoRepository) UpdateConcurrently(ctx context.Context, aggregateID string, updateCb domain.UpdateProjectionCallback, expectedVersion uint64) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
defer span.Finish()
span.LogFields(log.String("aggregateID", aggregateID))
session, err := b.db.StartSession()
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "StartSession aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
}
defer session.EndSession(ctx)
err = mongo.WithSession(ctx, session, func(sessionContext mongo.SessionContext) error {
if err := session.StartTransaction(); err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "StartTransaction aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
}
filter := bson.M{constants.MongoAggregateID: aggregateID}
foundProjection := &domain.BankAccountMongoProjection{}
err := b.bankAccountsCollection().FindOne(ctx, filter).Decode(foundProjection)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOne] aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
}
if foundProjection.Version != expectedVersion {
return tracing.TraceWithErr(span, errors.Wrapf(es.ErrInvalidEventVersion, "[FindOne] aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
}
foundProjection = updateCb(foundProjection)
foundProjection.ID = ""
foundProjection.UpdatedAt = time.Now().UTC()
ops := options.FindOneAndUpdate()
ops.SetReturnDocument(options.After)
ops.SetUpsert(false)
filter = bson.M{constants.MongoAggregateID: foundProjection.AggregateID}
err = b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": foundProjection}, ops).Decode(foundProjection)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOneAndUpdate] aggregateID: %s, expectedVersion: %d", foundProjection.AggregateID, expectedVersion))
}
b.log.Infof("[UpdateConcurrently] result AggregateID: %s, expectedVersion: %d", foundProjection.AggregateID, expectedVersion)
return session.CommitTransaction(ctx)
})
if err != nil {
if err := session.AbortTransaction(ctx); err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "AbortTransaction aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
}
return tracing.TraceWithErr(span, errors.Wrapf(err, "mongo.WithSession aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
}
return nil
}
func (b *bankAccountMongoRepository) Upsert(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
defer span.Finish()
span.LogFields(log.String("aggregateID", projection.AggregateID))
projection.UpdatedAt = time.Now().UTC()
ops := options.FindOneAndUpdate()
ops.SetReturnDocument(options.After)
ops.SetUpsert(true)
filter := bson.M{constants.MongoAggregateID: projection.AggregateID}
err := b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": projection}, ops).Decode(projection)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "Upsert [FindOneAndUpdate] aggregateID: %s", projection.AggregateID))
}
b.log.Debugf("[Upsert] result AggregateID: %s", projection.AggregateID)
return nil
}
func (b *bankAccountMongoRepository) DeleteByAggregateID(ctx context.Context, aggregateID string) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.DeleteByAggregateID")
defer span.Finish()
span.LogFields(log.String("aggregateID", aggregateID))
filter := bson.M{constants.MongoAggregateID: aggregateID}
ops := options.Delete()
result, err := b.bankAccountsCollection().DeleteOne(ctx, filter, ops)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "DeleteByAggregateID [FindOneAndDelete] aggregateID: %s", aggregateID))
}
b.log.Debugf("[DeleteByAggregateID] result AggregateID: %s, deleteCount: %d", aggregateID, result.DeletedCount)
return nil
}
func (b *bankAccountMongoRepository) GetByAggregateID(ctx context.Context, aggregateID string) (*domain.BankAccountMongoProjection, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.GetByAggregateID")
defer span.Finish()
span.LogFields(log.String("aggregateID", aggregateID))
filter := bson.M{constants.MongoAggregateID: aggregateID}
var projection domain.BankAccountMongoProjection
err := b.bankAccountsCollection().FindOne(ctx, filter).Decode(&projection)
if err != nil {
return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOne] aggregateID: %s", projection.AggregateID))
}
b.log.Debugf("[GetByAggregateID] result projection: %+v", projection)
return &projection, nil
}
func (b *bankAccountMongoRepository) bankAccountsCollection() *mongo.Collection {
return b.db.Database(b.cfg.Mongo.Db).Collection(b.cfg.MongoCollections.BankAccounts)
}
ElasticSearch 存储库的实现使用了go-elasticsearch官方库,
另一个不错的库是olivere elastic,但它不支持本项目使用的 8 版本。
type elasticRepo struct {
log logger.Logger
cfg *config.Config
client *elasticsearch.Client
}
func NewElasticRepository(log logger.Logger, cfg *config.Config, client *elasticsearch.Client) *elasticRepo {
return &elasticRepo{log: log, cfg: cfg, client: client}
}
func (e *elasticRepo) Index(ctx context.Context, projection *domain.ElasticSearchProjection) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Index")
defer span.Finish()
span.LogFields(log.String("aggregateID", projection.AggregateID))
response, err := esclient.Index(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, projection.AggregateID, projection)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Index id: %s", projection.AggregateID))
}
defer response.Body.Close()
if response.IsError() {
return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch request err"), "response.IsError id: %s", projection.AggregateID))
}
if response.HasWarnings() {
e.log.Warnf("ElasticSearch Index warnings: %+v", response.Warnings())
}
e.log.Infof("ElasticSearch index result: %s", response.String())
return nil
}
func (e *elasticRepo) Update(ctx context.Context, projection *domain.ElasticSearchProjection) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Update")
defer span.Finish()
span.LogFields(log.String("aggregateID", projection.AggregateID))
projection.UpdatedAt = time.Now().UTC()
response, err := esclient.Update(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, projection.AggregateID, projection)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Update id: %s", projection.AggregateID))
}
defer response.Body.Close()
if response.IsError() {
return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch request err"), "response.IsError id: %s", projection.AggregateID))
}
if response.HasWarnings() {
e.log.Warnf("ElasticSearch Update warnings: %+v", response.Warnings())
}
e.log.Infof("ElasticSearch update result: %s", response.String())
return nil
}
func (e *elasticRepo) DeleteByAggregateID(ctx context.Context, aggregateID string) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.DeleteByAggregateID")
defer span.Finish()
span.LogFields(log.String("aggregateID", aggregateID))
response, err := esclient.Delete(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, aggregateID)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Delete id: %s", aggregateID))
}
defer response.Body.Close()
if response.IsError() && response.StatusCode != http.StatusNotFound {
return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch delete"), "response.IsError aggregateID: %s, status: %s", aggregateID, response.Status()))
}
if response.HasWarnings() {
e.log.Warnf("ElasticSearch Delete warnings: %+v", response.Warnings())
}
e.log.Infof("ElasticSearch delete result: %s", response.String())
return nil
}
func (e *elasticRepo) GetByAggregateID(ctx context.Context, aggregateID string) (*domain.ElasticSearchProjection, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.GetByAggregateID")
defer span.Finish()
span.LogFields(log.String("aggregateID", aggregateID))
response, err := esclient.GetByID[*domain.ElasticSearchProjection](ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, aggregateID)
if err != nil {
return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.GetByID id: %s", aggregateID))
}
e.log.Infof("ElasticSearch delete result: %+v", response)
return response.Source, nil
}
func (e *elasticRepo) Search(ctx context.Context, term string, options esclient.SearchOptions) (*esclient.SearchListResponse[*domain.ElasticSearchProjection], error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Search")
defer span.Finish()
span.LogFields(log.String("term", term))
searchMatchPrefixRequest := esclient.SearchMatchPrefixRequest{
Index: []string{e.cfg.ElasticIndexes.BankAccounts},
Term: term,
Size: options.Size,
From: options.From,
Sort: []string{"balance.amount"},
Fields: options.Fields,
SortMap: map[string]interface{}{"balance.amount": "asc"},
}
if options.Sort != nil {
searchMatchPrefixRequest.Sort = options.Sort
}
response, err := esclient.SearchMultiMatchPrefix[*domain.ElasticSearchProjection](ctx, e.client, searchMatchPrefixRequest)
if err != nil {
return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.SearchMultiMatchPrefix term: %s", term))
}
return response, nil
}
您可以在这里找到更多详细信息和完整项目的源代码。当然,
在实际应用中,业务逻辑和基础设施代码要复杂得多,我们需要实现更多必要的功能。
希望这篇文章对您有所帮助,欢迎您提出任何反馈或问题,请随时通过电子邮件或任何即时通讯工具与我联系 :)











