发布于 2026-01-06 0 阅读
0

使用 PostgreSQL、Kafka、MongoDB 和 ElasticSearch 实现事件溯源和 CQRS 👋✨💫

使用 PostgreSQL、Kafka、MongoDB 和 ElasticSearch 实现事件溯源和 CQRS 👋✨💫

👨‍💻 已使用物品清单:

PostgreSQL作为事件存储数据库,
Kafka作为消息代理,
gRPC是 Go 语言实现的 gRPC ,
Jaeger 是一个开源的端到端分布式追踪工具,
Prometheus提供监控和告警功能,
Grafana用于构建包含 Prometheus 和 MongoDB 数据的可观测性仪表盘
MongoDB 是一个数据库
,Elasticsearch是一个 Go 语言的 Elasticsearch 客户端。Echo
是一个Web 框架
,Kibana是一个用于 Elasticsearch 的数据可视化仪表盘软件,
Migrate用于数据迁移。

源代码可在GitHub 仓库中找到。
本项目的主要思路是使用 Go、PostgreSQL 和 Kafka 实现事件溯源和 CQRS,并使用 MongoDB 和 ElasticSearch 进行读取投影。之前我曾写过类似的文章,使用Go、EventStoreDB Spring实现
了相同的微服务。正如 之前所述,我认为 EventStoreDB 是事件溯源的最佳选择,但在实际项目中,我们通常会受到一些业务限制,例如 EventStoreDB 的使用可能不被允许。在这种情况下,PostgreSQL 和 Kafka 是实现自定义事件存储的良好替代方案。 如果您不熟悉事件溯源和 CQRS 模式,microservices.io 是最佳的学习资源。EventStoreDB网站博客 和文档也非常出色, 强烈推荐Alexey Zimarev 的《.NET Core 领域驱动设计实战》一书。





本项目使用PostgreSQLKafka实现事件存储的微服务,并使用MongoDBElasticsearch
作为投影的读取数据库 本文中的一些描述与之前重复,因为之前也有使用 PostgreSQL 和 Kafka 的实现,但思路相同。

所有用户界面都将通过以下端口提供:

Jaeger UI:http://localhost:16686

杰格

Prometheus 用户界面:http://localhost:9090

普罗米修斯

Grafana 用户界面:http://localhost:3005

格拉法纳

此项目的 Docker Compose 文件:



version: "3.9"

services:
  es_postgesql:
    image: postgres:14.4
    container_name: es_postgesql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=bank_accounts
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  zookeeper:
    image: 'bitnami/zookeeper:3.8.0'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - "./zookeeper:/zookeeper"
    networks: [ "microservices" ]

  kafka:
    image: 'bitnami/kafka:3.2.0'
    ports:
      - "9092:9092"
      - "9093:9093"
    volumes:
      - "./kafka_data:/bitnami"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
    networks: [ "microservices" ]

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - "27017:27017"
    volumes:
      - ./mongodb_data_container:/data/db
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.35
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

  node01:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.2.3
    container_name: node01
    restart: always
    environment:
      - node.name=node01
      - cluster.name=es-cluster-8
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - xpack.license.self_generated.type=basic
      - xpack.security.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - ./es-data01:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
      - "9300:9300"
    networks: [ "microservices" ]

  kibana:
    image: docker.elastic.co/kibana/kibana:8.2.3
    restart: always
    environment:
      ELASTICSEARCH_HOSTS: http://node01:9200
    ports:
      - "5601:5601"
    depends_on:
      - node01
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

volumes:
  mongodb_data_container:
  es-data01:
    driver: local

networks:
  microservices:
    name: microservices


Enter fullscreen mode Exit fullscreen mode

在事件溯源中,我们会存储实体发生的所有操作的历史记录,并从中推导出实体的状态。
我们可以回溯读取这些历史记录,从而确定实体在特定时间点的状态。
这是一种将数据作为事件存储在仅追加日志中的模式。

每个新事件都是一次变更。AggregateBase应该跟踪命令执行流程中发生的所有变更,以便 我们可以将这些变更持久化到命令处理程序中。
聚合会获取当前状态,验证特定操作的业务规则 ,并应用返回新状态的业务逻辑。此过程的关键在于“要么全部保存,要么全部不保存”。所有 聚合数据都必须成功保存。如果任何一条规则或操作失败,则整个状态变更将被拒绝。AggregateRoot 可以通过不同的方式实现,其主要方法是加载事件、应用和引发变更。 当我们从数据库中获取聚合时, 我们不会将其状态作为表或文档中的一条记录读取,而是读取之前保存的所有事件,并对每个事件调用when方法。 完成所有这些步骤后,我们将恢复给定聚合的所有历史记录。 通过这种方式,我们将聚合更新到其最新状态。










// AggregateRoot contains all methods of AggregateBase
type AggregateRoot interface {
    GetID() string
    SetID(id string) *AggregateBase
    GetType() AggregateType
    SetType(aggregateType AggregateType)
    GetChanges() []any
    ClearChanges()
    GetVersion() uint64
    ToSnapshot()
    String() string
    Load
    Apply
    RaiseEvent
}

// AggregateType type of the Aggregate
type AggregateType string

// AggregateBase base aggregate contains all main necessary fields
type AggregateBase struct {
    ID      string
    Version uint64
    Changes []any
    Type    AggregateType
    when    when
}

func NewAggregateBase(when when) *AggregateBase {
    if when == nil {
        return nil
    }

    return &AggregateBase{
        Version: startVersion,
        Changes: make([]any, 0, changesEventsCap),
        when:    when,
    }
}

// ClearChanges clear AggregateBase uncommitted Event's
func (a *AggregateBase) ClearChanges() {
    a.Changes = make([]any, 0, changesEventsCap)
}

// GetChanges get AggregateBase uncommitted Event's
func (a *AggregateBase) GetChanges() []any {
    return a.Changes
}

// Load add existing events from event store to aggregate using When interface method
func (a *AggregateBase) Load(events []any) error {
    for _, evt := range events {
        if err := a.when(evt); err != nil {
            return err
        }
        a.Version++
    }
    return nil
}

// Apply push event to aggregate uncommitted events using When method
func (a *AggregateBase) Apply(event any) error {
    if err := a.when(event); err != nil {
        return err
    }
    a.Version++
    a.Changes = append(a.Changes, event)
    return nil
}

// RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (a *AggregateBase) RaiseEvent(event any) error {
    if err := a.when(event); err != nil {
        return err
    }
    a.Version++
    return nil
}



Enter fullscreen mode Exit fullscreen mode

事件代表领域中发生的事实。它们是真理的来源,当前状态源自于事件。
事件是不可变的,代表着业务事实。
在事件溯源中,对聚合数据执行的每一次操作都应该产生一个新的事件。
事件代表领域中发生的事实。它们是真理的来源,当前状态源自于事件。事件
是不可变的,代表着业务事实。
这意味着我们永远不会更改或删除数据库中的任何内容,我们只会追加新的事件。

Postgres



type Event struct {
    EventID       string
    AggregateID   string
    EventType     EventType
    AggregateType AggregateType
    Version       uint64
    Data          []byte
    Metadata      []byte
    Timestamp     time.Time
}


Enter fullscreen mode Exit fullscreen mode

快照代表了某个“时间点”的当前状态。
如果我们严格按照事件溯源模式来操作,就需要获取所有这些交易记录来计算当前账户余额。
这样做效率很低。为了提高效率,你可能首先想到的是将最新状态缓存起来。
与其检索所有这些事件,我们只需检索一条记录并将其用于业务逻辑即可。这就是快照
其基本逻辑是:读取快照(如果存在),然后从事件存储中读取事件;
如果存在快照,则读取自上次创建快照的流修订版本以来的事件;否则,读取所有事件。
在我们的微服务中,我们存储了每 N 个事件的快照。
如果性能足够好,则可能不需要快照。



type Snapshot struct {
    ID      string        `json:"id"`
    Type    AggregateType `json:"type"`
    State   []byte        `json:"state"`
    Version uint64        `json:"version"`
}


Enter fullscreen mode Exit fullscreen mode

事件存储是系统的关键组成部分。域中发生的每一次变更都会被记录在数据库中。
它专门用于存储变更历史记录,状态由仅追加的事件日志表示。
事件是不可变的:它们不能被修改。AggregateStore
的实现包括 Load、Save 和 Exists 方法。Load
和 Save 方法接受聚合,然后使用 EventStoreDB 客户端加载或应用事件。Load
方法查找聚合的流名称,从聚合流中读取所有事件,
遍历所有事件,并为每个事件调用 RaiseEvent 处理程序。Save
方法通过保存变更历史记录来持久化聚合,并处理并发。
当您从 EventStore 中检索流时,会记录当前版本号,
然后在保存时,可以确定在此期间是否有人修改了该记录。



// Load es.Aggregate events using snapshots with given frequency
func (p *pgEventStore) Load(ctx context.Context, aggregate Aggregate) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Load")
    defer span.Finish()
    span.LogFields(log.String("aggregate", aggregate.String()))

    snapshot, err := p.GetSnapshot(ctx, aggregate.GetID())
    if err != nil && !errors.Is(err, pgx.ErrNoRows) {
        return tracing.TraceWithErr(span, err)
    }

    if snapshot != nil {
        if err := serializer.Unmarshal(snapshot.State, aggregate); err != nil {
            p.log.Errorf("(Load) serializer.Unmarshal err: %v", err)
            return tracing.TraceWithErr(span, errors.Wrap(err, "json.Unmarshal"))
        }

        err := p.loadAggregateEventsByVersion(ctx, aggregate)
        if err != nil {
            return err
        }

        p.log.Debugf("(Load Aggregate By Version) aggregate: %s", aggregate.String())
        span.LogFields(log.String("aggregate with events", aggregate.String()))
        return nil
    }

    err = p.loadEvents(ctx, aggregate)
    if err != nil {
        return err
    }

    p.log.Debugf("(Load Aggregate): aggregate: %s", aggregate.String())
    span.LogFields(log.String("aggregate with events", aggregate.String()))
    return nil
}

// Save es.Aggregate events using snapshots with given frequency
func (p *pgEventStore) Save(ctx context.Context, aggregate Aggregate) (err error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Save")
    defer span.Finish()
    span.LogFields(log.String("aggregate", aggregate.String()))

    if len(aggregate.GetChanges()) == 0 {
        p.log.Debug("(Save) aggregate.GetChanges()) == 0")
        span.LogFields(log.Int("events", len(aggregate.GetChanges())))
        return nil
    }

    tx, err := p.db.Begin(ctx)
    if err != nil {
        p.log.Errorf("(Save) db.Begin err: %v", err)
        return tracing.TraceWithErr(span, errors.Wrap(err, "db.Begin"))
    }

    defer func() {
        if tx != nil {
            if txErr := tx.Rollback(ctx); txErr != nil && !errors.Is(txErr, pgx.ErrTxClosed) {
                err = txErr
                tracing.TraceErr(span, err)
                return
            }
        }
    }()

    changes := aggregate.GetChanges()
    events := make([]Event, 0, len(changes))

    for i := range changes {
        event, err := p.serializer.SerializeEvent(aggregate, changes[i])
        if err != nil {
            p.log.Errorf("(Save) serializer.SerializeEvent err: %v", err)
            return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.SerializeEvent"))
        }
        events = append(events, event)
    }

    if err := p.saveEventsTx(ctx, tx, events); err != nil {
        return tracing.TraceWithErr(span, errors.Wrap(err, "saveEventsTx"))
    }

    if aggregate.GetVersion()%p.cfg.SnapshotFrequency == 0 {
        aggregate.ToSnapshot()
        if err := p.saveSnapshotTx(ctx, tx, aggregate); err != nil {
            return tracing.TraceWithErr(span, errors.Wrap(err, "saveSnapshotTx"))
        }
    }

    if err := p.processEvents(ctx, events); err != nil {
        return tracing.TraceWithErr(span, errors.Wrap(err, "processEvents"))
    }

    p.log.Debugf("(Save Aggregate): aggregate: %s", aggregate.String())
    span.LogFields(log.String("aggregate with events", aggregate.String()))
    return tx.Commit(ctx)
}


Enter fullscreen mode Exit fullscreen mode

对于事件的序列化和反序列化,我们需要实现 Serializer 接口:



type Serializer interface {
    SerializeEvent(aggregate Aggregate, event any) (Event, error)
    DeserializeEvent(event Event) (any, error)
}


Enter fullscreen mode Exit fullscreen mode

银行账户汇总的实现:



type eventSerializer struct {
}

func NewEventSerializer() *eventSerializer {
    return &eventSerializer{}
}

func (s *eventSerializer) SerializeEvent(aggregate es.Aggregate, event any) (es.Event, error) {
    eventBytes, err := serializer.Marshal(event)
    if err != nil {
        return es.Event{}, errors.Wrapf(err, "serializer.Marshal aggregateID: %s", aggregate.GetID())
    }

    switch evt := event.(type) {

    case *events.BankAccountCreatedEventV1:
        return es.NewEvent(aggregate, events.BankAccountCreatedEventType, eventBytes, evt.Metadata), nil

    case *events.BalanceDepositedEventV1:
        return es.NewEvent(aggregate, events.BalanceDepositedEventType, eventBytes, evt.Metadata), nil

    case *events.BalanceWithdrawnEventV1:
        return es.NewEvent(aggregate, events.BalanceWithdrawnEventType, eventBytes, evt.Metadata), nil

    case *events.EmailChangedEventV1:
        return es.NewEvent(aggregate, events.EmailChangedEventType, eventBytes, evt.Metadata), nil

    default:
        return es.Event{}, errors.Wrapf(ErrInvalidEvent, "aggregateID: %s, type: %T", aggregate.GetID(), event)
    }

}

func (s *eventSerializer) DeserializeEvent(event es.Event) (any, error) {
    switch event.GetEventType() {

    case events.BankAccountCreatedEventType:
        return deserializeEvent(event, new(events.BankAccountCreatedEventV1))

    case events.BalanceDepositedEventType:
        return deserializeEvent(event, new(events.BalanceDepositedEventV1))

    case events.BalanceWithdrawnEventType:
        return deserializeEvent(event, new(events.BalanceWithdrawnEventV1))

    case events.EmailChangedEventType:
        return deserializeEvent(event, new(events.EmailChangedEventV1))

    default:
        return nil, errors.Wrapf(ErrInvalidEvent, "type: %s", event.GetEventType())
    }
}



Enter fullscreen mode Exit fullscreen mode

接下来,我们来创建一个银行账户聚合:



const (
    BankAccountAggregateType es.AggregateType = "BankAccount"
)

type BankAccountAggregate struct {
    *es.AggregateBase
    BankAccount *BankAccount
}

func NewBankAccountAggregate(id string) *BankAccountAggregate {
    if id == "" {
        return nil
    }

    bankAccountAggregate := &BankAccountAggregate{BankAccount: NewBankAccount(id)}
    aggregateBase := es.NewAggregateBase(bankAccountAggregate.When)
    aggregateBase.SetType(BankAccountAggregateType)
    aggregateBase.SetID(id)
    bankAccountAggregate.AggregateBase = aggregateBase
    return bankAccountAggregate
}

func (a *BankAccountAggregate) When(event any) error {

    switch evt := event.(type) {

    case *events.BankAccountCreatedEventV1:
        a.BankAccount.Email = evt.Email
        a.BankAccount.Address = evt.Address
        a.BankAccount.Balance = evt.Balance
        a.BankAccount.FirstName = evt.FirstName
        a.BankAccount.LastName = evt.LastName
        a.BankAccount.Status = evt.Status
        return nil

    case *events.BalanceDepositedEventV1:
        return a.BankAccount.DepositBalance(evt.Amount)

    case *events.BalanceWithdrawnEventV1:
        return a.BankAccount.WithdrawBalance(evt.Amount)

    case *events.EmailChangedEventV1:
        a.BankAccount.Email = evt.Email
        return nil

    default:
        return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "event: %#v", event)
    }
}

func (a *BankAccountAggregate) CreateNewBankAccount(ctx context.Context, email, address, firstName, lastName, status string, amount int64) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.CreateNewBankAccount")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if amount < 0 {
        return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
    }

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.BankAccountCreatedEventV1{
        Email:     email,
        Address:   address,
        FirstName: firstName,
        LastName:  lastName,
        Balance:   money.New(amount, money.USD),
        Status:    status,
        Metadata:  metaDataBytes,
    }

    return a.Apply(event)
}

func (a *BankAccountAggregate) DepositBalance(ctx context.Context, amount int64, paymentID string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.DepositBalance")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if amount <= 0 {
        return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
    }

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.BalanceDepositedEventV1{
        Amount:    amount,
        PaymentID: paymentID,
        Metadata:  metaDataBytes,
    }

    return a.Apply(event)
}

func (a *BankAccountAggregate) WithdrawBalance(ctx context.Context, amount int64, paymentID string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.WithdrawBalance")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if amount <= 0 {
        return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
    }

    balance, err := money.New(a.BankAccount.Balance.Amount(), money.USD).Subtract(money.New(amount, money.USD))
    if err != nil {
        return errors.Wrapf(err, "Balance.Subtract amount: %d", amount)
    }

    if balance.IsNegative() {
        return errors.Wrapf(bankAccountErrors.ErrNotEnoughBalance, "amount: %d", amount)
    }

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.BalanceWithdrawnEventV1{
        Amount:    amount,
        PaymentID: paymentID,
        Metadata:  metaDataBytes,
    }

    return a.Apply(event)
}

func (a *BankAccountAggregate) ChangeEmail(ctx context.Context, email string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.ChangeEmail")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.EmailChangedEventV1{Email: email, Metadata: metaDataBytes}

    return a.Apply(event)
}


Enter fullscreen mode Exit fullscreen mode

杰格

我们的微服务接受 HTTP 和 gRPC 请求:
银行账户 REST 控制器接受请求,使用验证器验证请求,
然后调用命令或查询服务。CQRS
之所以流行,主要原因在于它能够
分别处理读取和写入操作,因为这两种操作的优化技术差异巨大。Go
的 http 框架使用了echo

HTTP 处理程序:



func (h *bankAccountHandlers) CreateBankAccount() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.CreateBankAccount")
        defer span.Finish()
        h.metrics.HttpCreateBankAccountRequests.Inc()

        var command commands.CreateBankAccountCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        command.AggregateID = uuid.NewV4().String()

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.CreateBankAccount.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(CreateBankAccount.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(BankAccount created) id: %s", command.AggregateID)
        return c.JSON(http.StatusCreated, command.AggregateID)
    }
}

func (h *bankAccountHandlers) DepositBalance() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.DepositBalance")
        defer span.Finish()
        h.metrics.HttpDepositBalanceRequests.Inc()

        var command commands.DepositBalanceCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        command.AggregateID = c.Param(constants.ID)

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.DepositBalance.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(DepositBalance.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(balance deposited) id: %s, amount: %d", command.AggregateID)
        return c.NoContent(http.StatusOK)
    }
}

func (h *bankAccountHandlers) WithdrawBalance() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.WithdrawBalance")
        defer span.Finish()
        h.metrics.HttpWithdrawBalanceRequests.Inc()

        var command commands.WithdrawBalanceCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        command.AggregateID = c.Param(constants.ID)

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.WithdrawBalance.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(WithdrawBalance.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(balance withdraw) id: %s, amount: %d", command.AggregateID)
        return c.NoContent(http.StatusOK)
    }
}

func (h *bankAccountHandlers) ChangeEmail() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.WithdrawBalance")
        defer span.Finish()
        h.metrics.HttpChangeEmailRequests.Inc()

        var command commands.ChangeEmailCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        command.AggregateID = c.Param(constants.ID)

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.ChangeEmail.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(ChangeEmail.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(balance withdraw) id: %s, amount: %d", command.AggregateID)
        return c.NoContent(http.StatusOK)
    }
}

func (h *bankAccountHandlers) GetByID() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.GetByID")
        defer span.Finish()
        h.metrics.HttpGetBuIdRequests.Inc()

        var query queries.GetBankAccountByIDQuery
        if err := c.Bind(&query); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        query.AggregateID = c.Param(constants.ID)

        fromStore := c.QueryParam("store")
        if fromStore != "" {
            isFromStore, err := strconv.ParseBool(fromStore)
            if err != nil {
                h.log.Errorf("strconv.ParseBool err: %v", tracing.TraceWithErr(span, err))
                return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
            }
            query.FromEventStore = isFromStore
        }

        if err := h.validate.StructCtx(ctx, query); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        bankAccountProjection, err := h.bankAccountService.Queries.GetBankAccountByID.Handle(ctx, query)
        if err != nil {
            h.log.Errorf("(ChangeEmail.Handle) id: %s, err: %v", query.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(get bank account) id: %s", bankAccountProjection.AggregateID)
        return c.JSON(http.StatusOK, mappers.BankAccountMongoProjectionToHttp(bankAccountProjection))
    }
}

func (h *bankAccountHandlers) Search() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.Search")
        defer span.Finish()
        h.metrics.HttpSearchRequests.Inc()

        var query queries.SearchBankAccountsQuery
        if err := c.Bind(&query); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        query.QueryTerm = c.QueryParam("search")
        query.Pagination = utils.NewPaginationFromQueryParams(c.QueryParam(constants.Size), c.QueryParam(constants.Page))

        if err := h.validate.StructCtx(ctx, query); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        searchResult, err := h.bankAccountService.Queries.SearchBankAccounts.Handle(ctx, query)
        if err != nil {
            h.log.Errorf("(SearchBankAccounts.Handle) id: %s, err: %v", query.QueryTerm, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        response := mappers.SearchResultToHttp(searchResult.List, searchResult.PaginationResponse)

        h.log.Infof("(search) result: %+v", response)
        return c.JSON(http.StatusOK, response)
    }
}


Enter fullscreen mode Exit fullscreen mode

普罗米修斯

推荐使用bloomrpc,它是一款优秀的 gRPC 服务图形用户界面客户端。gRPC
服务处理程序:



func (g *grpcService) CreateBankAccount(ctx context.Context, request *bankAccountService.CreateBankAccountRequest) (*bankAccountService.CreateBankAccountResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.CreateBankAccount")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcCreateBankAccountRequests.Inc()

    aggregateID := uuid.NewV4().String()
    command := commands.CreateBankAccountCommand{
        AggregateID: aggregateID,
        Email:       request.GetEmail(),
        Address:     request.GetAddress(),
        FirstName:   request.GetFirstName(),
        LastName:    request.GetLastName(),
        Balance:     request.GetBalance(),
        Status:      request.GetStatus(),
    }

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.CreateBankAccount.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(CreateBankAccount.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [created account] aggregateID: %s", aggregateID)
    return &bankAccountService.CreateBankAccountResponse{Id: aggregateID}, nil
}

func (g *grpcService) DepositBalance(ctx context.Context, request *bankAccountService.DepositBalanceRequest) (*bankAccountService.DepositBalanceResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.DepositBalance")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcDepositBalanceRequests.Inc()

    command := commands.DepositBalanceCommand{
        AggregateID: request.GetId(),
        Amount:      request.GetAmount(),
        PaymentID:   request.GetPaymentId(),
    }

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.DepositBalance.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(DepositBalance.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [deposited balance] aggregateID: %s, amount: %v", request.GetId(), request.GetAmount())
    return new(bankAccountService.DepositBalanceResponse), nil
}

func (g *grpcService) WithdrawBalance(ctx context.Context, request *bankAccountService.WithdrawBalanceRequest) (*bankAccountService.WithdrawBalanceResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.WithdrawBalance")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcWithdrawBalanceRequests.Inc()

    command := commands.WithdrawBalanceCommand{
        AggregateID: request.GetId(),
        Amount:      request.GetAmount(),
        PaymentID:   request.GetPaymentId(),
    }

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.WithdrawBalance.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(WithdrawBalance.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [withdraw balance] aggregateID: %s, amount: %v", request.GetId(), request.GetAmount())
    return new(bankAccountService.WithdrawBalanceResponse), nil
}

func (g *grpcService) ChangeEmail(ctx context.Context, request *bankAccountService.ChangeEmailRequest) (*bankAccountService.ChangeEmailResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.ChangeEmail")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcChangeEmailRequests.Inc()

    command := commands.ChangeEmailCommand{AggregateID: request.GetId(), NewEmail: request.GetEmail()}

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.ChangeEmail.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(ChangeEmail.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [changed email] aggregateID: %s, newEmail: %s", request.GetId(), request.GetEmail())
    return new(bankAccountService.ChangeEmailResponse), nil
}

func (g *grpcService) GetById(ctx context.Context, request *bankAccountService.GetByIdRequest) (*bankAccountService.GetByIdResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetById")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcGetBuIdRequests.Inc()

    query := queries.GetBankAccountByIDQuery{AggregateID: request.GetId(), FromEventStore: request.IsOwner}

    if err := g.validate.StructCtx(ctx, query); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    bankAccountProjection, err := g.bankAccountService.Queries.GetBankAccountByID.Handle(ctx, query)
    if err != nil {
        g.log.Errorf("(GetBankAccountByID.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [get account by id] projection: %+v", bankAccountProjection)
    return &bankAccountService.GetByIdResponse{BankAccount: mappers.BankAccountMongoProjectionToProto(bankAccountProjection)}, nil
}

func (g *grpcService) SearchBankAccounts(ctx context.Context, request *bankAccountService.SearchBankAccountsRequest) (*bankAccountService.SearchBankAccountsResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.SearchBankAccounts")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcSearchRequests.Inc()

    query := queries.SearchBankAccountsQuery{
        QueryTerm: request.GetSearchText(),
        Pagination: &utils.Pagination{
            Size: int(request.GetSize()),
            Page: int(request.GetPage()),
        },
    }

    if err := g.validate.StructCtx(ctx, query); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    searchQueryResult, err := g.bankAccountService.Queries.SearchBankAccounts.Handle(ctx, query)
    if err != nil {
        g.log.Errorf("(SearchBankAccounts.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [search] result: %+vv", searchQueryResult.PaginationResponse)
    return &bankAccountService.SearchBankAccountsResponse{
        BankAccounts: mappers.SearchBankAccountsListToProto(searchQueryResult.List),
        Pagination:   mappers.PaginationResponseToProto(searchQueryResult.PaginationResponse),
    }, nil
}


Enter fullscreen mode Exit fullscreen mode

命令的主要属性在于,当命令成功执行时,系统会转换到新的状态。
命令处理程序负责处理命令、修改状态或执行其他副作用。
命令服务处理 CQRS 命令,从事件存储中加载聚合数据,并根据应用程序的业务逻辑调用其方法。
聚合数据应用这些更改,然后将这些事件更改列表保存到事件存储中。

杰格

这里创建银行账户的命令很简单,但在现实世界中,业务逻辑当然要复杂得多,我们必须检查电子邮件是否可用等等。



type CreateBankAccountCommand struct {
    AggregateID string `json:"id" validate:"required,gte=0"`
    Email       string `json:"email" validate:"required,gte=0,email"`
    Address     string `json:"address" validate:"required,gte=0"`
    FirstName   string `json:"firstName" validate:"required,gte=0"`
    LastName    string `json:"lastName" validate:"required,gte=0"`
    Balance     int64  `json:"balance" validate:"gte=0"`
    Status      string `json:"status"`
}

type CreateBankAccount interface {
    Handle(ctx context.Context, cmd CreateBankAccountCommand) error
}

type createBankAccountCmdHandler struct {
    log            logger.Logger
    aggregateStore es.AggregateStore
}

func NewCreateBankAccountCmdHandler(log logger.Logger, aggregateStore es.AggregateStore) *createBankAccountCmdHandler {
    return &createBankAccountCmdHandler{log: log, aggregateStore: aggregateStore}
}

func (c *createBankAccountCmdHandler) Handle(ctx context.Context, cmd CreateBankAccountCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createBankAccountCmdHandler.Handle")
    defer span.Finish()
    span.LogFields(log.Object("command", cmd))

    exists, err := c.aggregateStore.Exists(ctx, cmd.AggregateID)
    if err != nil {
        return tracing.TraceWithErr(span, err)
    }
    if exists {
        return tracing.TraceWithErr(span, errors.New("already exists"))
    }

    bankAccountAggregate := domain.NewBankAccountAggregate(cmd.AggregateID)
    err = bankAccountAggregate.CreateNewBankAccount(ctx, cmd.Email, cmd.Address, cmd.FirstName, cmd.LastName, cmd.Status, cmd.Balance)
    if err != nil {
        return tracing.TraceWithErr(span, err)
    }

    return c.aggregateStore.Save(ctx, bankAccountAggregate)
}


Enter fullscreen mode Exit fullscreen mode

杰格

在事件溯源中,投影(也称为视图模型或查询模型)提供了底层基于事件的数据模型的视图。
它们通常代表将源写入模型转换为读取模型的逻辑。
其核心思想是,投影会接收所有能够投影的事件,并
使用读取模型数据库提供的常规 CRUD 操作,对其控制的读取模型执行常规的 CRUD 操作。
投影不仅限于处理单个实体的事件,还可以为多个实体(甚至是不同类型的实体)收集和聚合数据。
事件存储中附加的事件会触发创建或更新读取模型的投影逻辑。
我们可以订阅订单类型流事件的投影。
当我们执行命令时,聚合会生成一个新事件,表示聚合的状态转换。
这些事件会被提交到存储中,因此存储会将它们附加到聚合流的末尾。
投影接收这些事件,并使用 `When` 方法更新其读取模型。与聚合类似,它会根据事件类型应用相应的更改。



func (s *mongoSubscription) ProcessMessagesErrGroup(ctx context.Context, r *kafka.Reader, workerID int) error {

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        m, err := r.FetchMessage(ctx)
        if err != nil {
            s.log.Warnf("(mongoSubscription) workerID: %d, err: %v", workerID, err)
            continue
        }

        switch m.Topic {
        case es.GetTopicName(s.cfg.KafkaPublisherConfig.TopicPrefix, string(domain.BankAccountAggregateType)):
            s.handleBankAccountEvents(ctx, r, m)
        }
    }
}

func (s *mongoSubscription) handleBankAccountEvents(ctx context.Context, r *kafka.Reader, m kafka.Message) {
    ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "mongoSubscription.handleBankAccountEvents")
    defer span.Finish()

    var events []es.Event
    if err := serializer.Unmarshal(m.Value, &events); err != nil {
        s.log.Errorf("serializer.Unmarshal: %v", tracing.TraceWithErr(span, err))
        s.commitErrMessage(ctx, r, m)
        return
    }

    for _, event := range events {
        if err := s.handle(ctx, r, m, event); err != nil {
            return
        }
    }
    s.commitMessage(ctx, r, m)
}

func (s *mongoSubscription) handle(ctx context.Context, r *kafka.Reader, m kafka.Message, event es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoSubscription.handle")
    defer span.Finish()

    err := s.projection.When(ctx, event)
    if err != nil {
        s.log.Errorf("MongoSubscription When err: %v", err)

        recreateErr := s.recreateProjection(ctx, event)
        if recreateErr != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(recreateErr, "recreateProjection err: %v", err))
        }

        s.commitErrMessage(ctx, r, m)
        return tracing.TraceWithErr(span, errors.Wrapf(err, "When type: %s, aggregateID: %s", event.GetEventType(), event.GetAggregateID()))
    }

    s.log.Infof("MongoSubscription <<<commit>>> event: %s", event.String())
    return nil
}


Enter fullscreen mode Exit fullscreen mode

mongo
MongoDB投影通过实现When方法来处理事件,
处理事件并应用诸如聚合之类的更改,然后使用MongoDB存储库保存它:



type bankAccountMongoProjection struct {
    log             logger.Logger
    cfg             *config.Config
    serializer      es.Serializer
    mongoRepository domain.MongoRepository
}

func NewBankAccountMongoProjection(
    log logger.Logger,
    cfg *config.Config,
    serializer es.Serializer,
    mongoRepository domain.MongoRepository,
) *bankAccountMongoProjection {
    return &bankAccountMongoProjection{log: log, cfg: cfg, serializer: serializer, mongoRepository: mongoRepository}
}

func (b *bankAccountMongoProjection) When(ctx context.Context, esEvent es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.When")
    defer span.Finish()

    deserializedEvent, err := b.serializer.DeserializeEvent(esEvent)
    if err != nil {
        return errors.Wrapf(err, "serializer.DeserializeEvent aggregateID: %s, type: %s", esEvent.GetAggregateID(), esEvent.GetEventType())
    }

    switch event := deserializedEvent.(type) {

    case *events.BankAccountCreatedEventV1:
        return b.onBankAccountCreated(ctx, esEvent, event)

    case *events.BalanceDepositedEventV1:
        return b.onBalanceDeposited(ctx, esEvent, event)

    case *events.BalanceWithdrawnEventV1:
        return b.onBalanceWithdrawn(ctx, esEvent, event)

    case *events.EmailChangedEventV1:
        return b.onEmailChanged(ctx, esEvent, event)

    default:
        return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "esEvent: %s", esEvent.String())
    }
}

func (b *bankAccountMongoProjection) onBankAccountCreated(ctx context.Context, esEvent es.Event, event *events.BankAccountCreatedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBankAccountCreated")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if esEvent.GetVersion() != 1 {
        return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, version: %d", esEvent.GetEventType(), esEvent.GetVersion())
    }

    projection := &domain.BankAccountMongoProjection{
        AggregateID: esEvent.GetAggregateID(),
        Version:     esEvent.GetVersion(),
        Email:       event.Email,
        Address:     event.Address,
        FirstName:   event.FirstName,
        LastName:    event.LastName,
        Balance: domain.Balance{
            Amount:   event.Balance.AsMajorUnits(),
            Currency: event.Balance.Currency().Code,
        },
        Status:    event.Status,
        UpdatedAt: time.Now().UTC(),
        CreatedAt: time.Now().UTC(),
    }

    err := b.mongoRepository.Insert(ctx, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBankAccountCreated] mongoRepository.Insert aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onBankAccountCreated] projection: %#v", projection)
    return nil
}

func (b *bankAccountMongoProjection) onBalanceDeposited(ctx context.Context, esEvent es.Event, event *events.BalanceDepositedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBalanceDeposited")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
        projection.Balance.Amount += money.New(event.Amount, money.USD).AsMajorUnits()
        projection.Version = esEvent.GetVersion()
        return projection
    }, esEvent.GetVersion()-1); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onBalanceDeposited] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
    return nil
}

func (b *bankAccountMongoProjection) onBalanceWithdrawn(ctx context.Context, esEvent es.Event, event *events.BalanceWithdrawnEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBalanceWithdrawn")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
        projection.Balance.Amount -= money.New(event.Amount, money.USD).AsMajorUnits()
        projection.Version = esEvent.GetVersion()

        return projection
    }, esEvent.GetVersion()-1); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onBalanceWithdrawn] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
    return nil
}

func (b *bankAccountMongoProjection) onEmailChanged(ctx context.Context, esEvent es.Event, event *events.EmailChangedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onEmailChanged")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
        projection.Email = event.Email
        projection.Version = esEvent.GetVersion()
        return projection
    }, esEvent.GetVersion()-1); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onEmailChanged] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
    return nil
}


Enter fullscreen mode Exit fullscreen mode

基巴纳

ElasticSearch 投影也做同样的事情,即为文档建立索引以便搜索:



type elasticProjection struct {
    log               logger.Logger
    cfg               *config.Config
    serializer        es.Serializer
    elasticSearchRepo domain.ElasticSearchRepository
}

func NewElasticProjection(log logger.Logger, cfg *config.Config, serializer es.Serializer, elasticSearchRepo domain.ElasticSearchRepository) *elasticProjection {
    return &elasticProjection{log: log, cfg: cfg, serializer: serializer, elasticSearchRepo: elasticSearchRepo}
}

func (e *elasticProjection) When(ctx context.Context, esEvent es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.When")
    defer span.Finish()

    deserializedEvent, err := e.serializer.DeserializeEvent(esEvent)
    if err != nil {
        return errors.Wrapf(err, "serializer.DeserializeEvent aggregateID: %s, type: %s", esEvent.GetAggregateID(), esEvent.GetEventType())
    }

    switch event := deserializedEvent.(type) {

    case *events.BankAccountCreatedEventV1:
        return e.onBankAccountCreated(ctx, esEvent, event)

    case *events.BalanceDepositedEventV1:
        return e.onBalanceDeposited(ctx, esEvent, event)

    case *events.BalanceWithdrawnEventV1:
        return e.onBalanceWithdrawn(ctx, esEvent, event)

    case *events.EmailChangedEventV1:
        return e.onEmailChanged(ctx, esEvent, event)

    default:
        return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "esEvent: %s", esEvent.String())
    }
}

func (e *elasticProjection) onBankAccountCreated(ctx context.Context, esEvent es.Event, event *events.BankAccountCreatedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBankAccountCreated")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if esEvent.GetVersion() != 1 {
        return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, version: %d", esEvent.GetEventType(), esEvent.GetVersion())
    }

    projection := &domain.ElasticSearchProjection{
        ID:          esEvent.GetAggregateID(),
        AggregateID: esEvent.GetAggregateID(),
        Version:     esEvent.GetVersion(),
        Email:       event.Email,
        Address:     event.Address,
        FirstName:   event.FirstName,
        LastName:    event.LastName,
        Balance: domain.Balance{
            Amount:   event.Balance.AsMajorUnits(),
            Currency: event.Balance.Currency().Code,
        },
        Status:    event.Status,
        UpdatedAt: time.Now().UTC(),
        CreatedAt: time.Now().UTC(),
    }

    err := e.elasticSearchRepo.Index(ctx, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] elasticSearchRepo.Index aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onBankAccountCreated] projection: %s", projection)
    return nil
}

func (e *elasticProjection) onBalanceDeposited(ctx context.Context, esEvent es.Event, event *events.BalanceDepositedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBalanceDeposited")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
    }

    if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
        return tracing.TraceWithErr(span, err)
    }

    projection.Balance.Amount += money.New(event.Amount, money.USD).AsMajorUnits()
    projection.Version = esEvent.GetVersion()

    if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onBalanceDeposited] projection: %s", projection)
    return nil
}

func (e *elasticProjection) onBalanceWithdrawn(ctx context.Context, esEvent es.Event, event *events.BalanceWithdrawnEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBalanceWithdrawn")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
    }

    if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
        return tracing.TraceWithErr(span, err)
    }

    projection.Balance.Amount -= money.New(event.Amount, money.USD).AsMajorUnits()
    projection.Version = esEvent.GetVersion()

    if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onBalanceWithdrawn] projection: %s", projection)
    return nil
}

func (e *elasticProjection) onEmailChanged(ctx context.Context, esEvent es.Event, event *events.EmailChangedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onEmailChanged")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
    }

    if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
        return tracing.TraceWithErr(span, err)
    }

    projection.Email = event.Email
    projection.Version = esEvent.GetVersion()

    if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onEmailChanged] projection: %s", projection)
    return nil
}

func (e *elasticProjection) validateEventVersion(version uint64, esEvent es.Event) error {
    if version != esEvent.GetVersion()-1 {
        return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, eventVersion: %d, projectionVersion: %d", esEvent.GetEventType(), esEvent.GetVersion(), version)
    }
    return nil
}


Enter fullscreen mode Exit fullscreen mode

邮差

在 CQRS 中,查询代表获取数据的意图,并负责返回请求查询的结果。
读取模型可以源自写入模型,但并非必须如此。
它是将业务操作结果转换为可读形式的过程。
事件溯源系统的一大优势在于能够随时创建新的读取模型,
而不会影响其他任何内容。
然后,我们可以使用以下查询来检索投影数据:

布隆普尔

通过 ID 获取银行账户查询可以从 MongoDB 或聚合存储中加载(如果需要):



type GetBankAccountByIDQuery struct {
    AggregateID    string `json:"aggregateID" validate:"required,gte=0"`
    FromEventStore bool   `json:"fromEventStore"`
}

type GetBankAccountByID interface {
    Handle(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error)
}

type getBankAccountByIDQuery struct {
    log             logger.Logger
    aggregateStore  es.AggregateStore
    mongoRepository domain.MongoRepository
}

func NewGetBankAccountByIDQuery(log logger.Logger, aggregateStore es.AggregateStore, mongoRepository domain.MongoRepository) *getBankAccountByIDQuery {
    return &getBankAccountByIDQuery{log: log, aggregateStore: aggregateStore, mongoRepository: mongoRepository}
}

func (q *getBankAccountByIDQuery) Handle(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "getBankAccountByIDQuery.Handle")
    defer span.Finish()
    span.LogFields(log.Object("query", query))

    if query.FromEventStore {
        return q.loadFromAggregateStore(ctx, query)
    }

    projection, err := q.mongoRepository.GetByAggregateID(ctx, query.AggregateID)
    if err != nil {
        if errors.Is(err, mongo.ErrNoDocuments) {
            bankAccountAggregate := domain.NewBankAccountAggregate(query.AggregateID)
            if err = q.aggregateStore.Load(ctx, bankAccountAggregate); err != nil {
                return nil, tracing.TraceWithErr(span, err)
            }
            if bankAccountAggregate.GetVersion() == 0 {
                return nil, tracing.TraceWithErr(span, errors.Wrapf(bankAccountErrors.ErrBankAccountNotFound, "id: %s", query.AggregateID))
            }

            mongoProjection := mappers.BankAccountToMongoProjection(bankAccountAggregate)
            err = q.mongoRepository.Upsert(ctx, mongoProjection)
            if err != nil {
                q.log.Errorf("(GetBankAccountByIDQuery) mongo Upsert AggregateID: %s, err: %v", query.AggregateID, tracing.TraceWithErr(span, err))
            }
            q.log.Debugf("(GetBankAccountByIDQuery) Upsert %+v", query)
            return mongoProjection, nil

        }
        return nil, tracing.TraceWithErr(span, err)
    }

    q.log.Debugf("(GetBankAccountByIDQuery) from mongo %+v", query)
    return projection, nil
}

func (q *getBankAccountByIDQuery) loadFromAggregateStore(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "getBankAccountByIDQuery.loadFromAggregateStore")
    defer span.Finish()

    bankAccountAggregate := domain.NewBankAccountAggregate(query.AggregateID)
    if err := q.aggregateStore.Load(ctx, bankAccountAggregate); err != nil {
        return nil, tracing.TraceWithErr(span, err)
    }
    if bankAccountAggregate.GetVersion() == 0 {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(bankAccountErrors.ErrBankAccountNotFound, "id: %s", query.AggregateID))
    }

    q.log.Debugf("(GetBankAccountByIDQuery) from aggregateStore bankAccountAggregate: %+v", bankAccountAggregate.BankAccount)
    return mappers.BankAccountToMongoProjection(bankAccountAggregate), nil
}


Enter fullscreen mode Exit fullscreen mode

杰格

使用官方MongoDB 客户端的银行账户 MongoDB 存储库方法



type bankAccountMongoRepository struct {
    log logger.Logger
    cfg *config.Config
    db  *mongo.Client
}

func NewBankAccountMongoRepository(log logger.Logger, cfg *config.Config, db *mongo.Client) *bankAccountMongoRepository {
    return &bankAccountMongoRepository{log: log, cfg: cfg, db: db}
}

func (b *bankAccountMongoRepository) Insert(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Insert")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    _, err := b.bankAccountsCollection().InsertOne(ctx, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[InsertOne] AggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[Insert] result AggregateID: %s", projection.AggregateID)
    return nil
}

func (b *bankAccountMongoRepository) Update(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    projection.ID = ""
    projection.UpdatedAt = time.Now().UTC()

    ops := options.FindOneAndUpdate()
    ops.SetReturnDocument(options.After)
    ops.SetUpsert(false)
    filter := bson.M{constants.MongoAggregateID: projection.AggregateID}

    err := b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": projection}, ops).Decode(projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOneAndUpdate] aggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[Update] result AggregateID: %s", projection.AggregateID)
    return nil
}

func (b *bankAccountMongoRepository) UpdateConcurrently(ctx context.Context, aggregateID string, updateCb domain.UpdateProjectionCallback, expectedVersion uint64) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    session, err := b.db.StartSession()
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "StartSession aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
    }
    defer session.EndSession(ctx)

    err = mongo.WithSession(ctx, session, func(sessionContext mongo.SessionContext) error {
        if err := session.StartTransaction(); err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "StartTransaction aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }

        filter := bson.M{constants.MongoAggregateID: aggregateID}
        foundProjection := &domain.BankAccountMongoProjection{}

        err := b.bankAccountsCollection().FindOne(ctx, filter).Decode(foundProjection)
        if err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOne] aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }

        if foundProjection.Version != expectedVersion {
            return tracing.TraceWithErr(span, errors.Wrapf(es.ErrInvalidEventVersion, "[FindOne] aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }

        foundProjection = updateCb(foundProjection)

        foundProjection.ID = ""
        foundProjection.UpdatedAt = time.Now().UTC()

        ops := options.FindOneAndUpdate()
        ops.SetReturnDocument(options.After)
        ops.SetUpsert(false)
        filter = bson.M{constants.MongoAggregateID: foundProjection.AggregateID}

        err = b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": foundProjection}, ops).Decode(foundProjection)
        if err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOneAndUpdate] aggregateID: %s, expectedVersion: %d", foundProjection.AggregateID, expectedVersion))
        }

        b.log.Infof("[UpdateConcurrently] result AggregateID: %s, expectedVersion: %d", foundProjection.AggregateID, expectedVersion)
        return session.CommitTransaction(ctx)
    })
    if err != nil {
        if err := session.AbortTransaction(ctx); err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "AbortTransaction aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }
        return tracing.TraceWithErr(span, errors.Wrapf(err, "mongo.WithSession aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
    }

    return nil
}

func (b *bankAccountMongoRepository) Upsert(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    projection.UpdatedAt = time.Now().UTC()

    ops := options.FindOneAndUpdate()
    ops.SetReturnDocument(options.After)
    ops.SetUpsert(true)
    filter := bson.M{constants.MongoAggregateID: projection.AggregateID}

    err := b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": projection}, ops).Decode(projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "Upsert [FindOneAndUpdate] aggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[Upsert] result AggregateID: %s", projection.AggregateID)
    return nil
}

func (b *bankAccountMongoRepository) DeleteByAggregateID(ctx context.Context, aggregateID string) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.DeleteByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    filter := bson.M{constants.MongoAggregateID: aggregateID}
    ops := options.Delete()

    result, err := b.bankAccountsCollection().DeleteOne(ctx, filter, ops)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "DeleteByAggregateID [FindOneAndDelete] aggregateID: %s", aggregateID))
    }

    b.log.Debugf("[DeleteByAggregateID] result AggregateID: %s, deleteCount: %d", aggregateID, result.DeletedCount)
    return nil
}

func (b *bankAccountMongoRepository) GetByAggregateID(ctx context.Context, aggregateID string) (*domain.BankAccountMongoProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.GetByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    filter := bson.M{constants.MongoAggregateID: aggregateID}
    var projection domain.BankAccountMongoProjection

    err := b.bankAccountsCollection().FindOne(ctx, filter).Decode(&projection)
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOne] aggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[GetByAggregateID] result projection: %+v", projection)
    return &projection, nil
}

func (b *bankAccountMongoRepository) bankAccountsCollection() *mongo.Collection {
    return b.db.Database(b.cfg.Mongo.Db).Collection(b.cfg.MongoCollections.BankAccounts)
}


Enter fullscreen mode Exit fullscreen mode

ElasticSearch 存储库的实现使用了go-elasticsearch官方库,
另一个不错的库是olivere elastic,但它不支持本项目使用的 8 版本。



type elasticRepo struct {
    log    logger.Logger
    cfg    *config.Config
    client *elasticsearch.Client
}

func NewElasticRepository(log logger.Logger, cfg *config.Config, client *elasticsearch.Client) *elasticRepo {
    return &elasticRepo{log: log, cfg: cfg, client: client}
}

func (e *elasticRepo) Index(ctx context.Context, projection *domain.ElasticSearchProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Index")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    response, err := esclient.Index(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, projection.AggregateID, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Index id: %s", projection.AggregateID))
    }
    defer response.Body.Close()

    if response.IsError() {
        return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch request err"), "response.IsError id: %s", projection.AggregateID))
    }
    if response.HasWarnings() {
        e.log.Warnf("ElasticSearch Index warnings: %+v", response.Warnings())
    }

    e.log.Infof("ElasticSearch index result: %s", response.String())
    return nil
}

func (e *elasticRepo) Update(ctx context.Context, projection *domain.ElasticSearchProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    projection.UpdatedAt = time.Now().UTC()

    response, err := esclient.Update(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, projection.AggregateID, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Update id: %s", projection.AggregateID))
    }
    defer response.Body.Close()

    if response.IsError() {
        return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch request err"), "response.IsError id: %s", projection.AggregateID))
    }
    if response.HasWarnings() {
        e.log.Warnf("ElasticSearch Update warnings: %+v", response.Warnings())
    }

    e.log.Infof("ElasticSearch update result: %s", response.String())
    return nil
}

func (e *elasticRepo) DeleteByAggregateID(ctx context.Context, aggregateID string) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.DeleteByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    response, err := esclient.Delete(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, aggregateID)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Delete id: %s", aggregateID))
    }
    defer response.Body.Close()

    if response.IsError() && response.StatusCode != http.StatusNotFound {
        return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch delete"), "response.IsError aggregateID: %s, status: %s", aggregateID, response.Status()))
    }
    if response.HasWarnings() {
        e.log.Warnf("ElasticSearch Delete warnings: %+v", response.Warnings())
    }

    e.log.Infof("ElasticSearch delete result: %s", response.String())
    return nil
}

func (e *elasticRepo) GetByAggregateID(ctx context.Context, aggregateID string) (*domain.ElasticSearchProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.GetByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    response, err := esclient.GetByID[*domain.ElasticSearchProjection](ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, aggregateID)
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.GetByID id: %s", aggregateID))
    }

    e.log.Infof("ElasticSearch delete result: %+v", response)
    return response.Source, nil
}

func (e *elasticRepo) Search(ctx context.Context, term string, options esclient.SearchOptions) (*esclient.SearchListResponse[*domain.ElasticSearchProjection], error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Search")
    defer span.Finish()
    span.LogFields(log.String("term", term))

    searchMatchPrefixRequest := esclient.SearchMatchPrefixRequest{
        Index:   []string{e.cfg.ElasticIndexes.BankAccounts},
        Term:    term,
        Size:    options.Size,
        From:    options.From,
        Sort:    []string{"balance.amount"},
        Fields:  options.Fields,
        SortMap: map[string]interface{}{"balance.amount": "asc"},
    }

    if options.Sort != nil {
        searchMatchPrefixRequest.Sort = options.Sort
    }

    response, err := esclient.SearchMultiMatchPrefix[*domain.ElasticSearchProjection](ctx, e.client, searchMatchPrefixRequest)
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.SearchMultiMatchPrefix term: %s", term))
    }

    return response, nil
}


Enter fullscreen mode Exit fullscreen mode

您可以在这里找到更多详细信息和完整项目的源代码。当然
在实际应用中,业务逻辑和基础设施代码要复杂得多,我们需要实现更多必要的功能。
希望这篇文章对您有所帮助,欢迎您提出任何反馈或问题,请随时通过电子邮件或任何即时通讯工具与我联系 :)

文章来源:https://dev.to/aleksk1ng/go-eventsource-and-cqrs-with-postgresql-kafka-mongodb-and-elasticsearch-44d7