发布于 2026-01-06 0 阅读
0

Java Spring EventSourcing 和 CQRS Clean Architecture 微服务 👋⚡️💫

Java Spring EventSourcing 和 CQRS Clean Architecture 微服务 👋⚡️💫

本文将尝试使用以下方法创建更接近真实世界的事件溯源和 CQRS 微服务:🚀👨‍💻🙌

👨‍💻 已使用物品清单:

源代码可在GitHub 仓库

中找到 。本项目的主要思路是使用 Java、Spring 和基于 PostgreSQL 的 EventStore 实现事件溯源和 CQRS。
之前我写过一篇类似的文章,其中使用Go 和 EventStoreDB实现了相同的功能。
我认为 EventStoreDB 是事件溯源的最佳选择,但在实际项目中,我们通常会受到一些业务限制,例如
,可能不允许使用 EventStoreDB。在这种情况下,我认为 PostgreSQL 和 Kafka 是实现自定义事件存储的不错替代方案。
本文并未详细介绍事件溯源和 CQRS 模式,最佳阅读资源是microservices.io,该博客和本文
的文档也非常出色。 正如之前的文章所述,我强烈推荐Alexey Zimarev 的著作《.NET Core 领域驱动设计实战》及其博客

在这个项目中,我们使用PostgreSQLSpring Data JPA
Kafka实现了微服务事件存储 使用Spring Data MongoDB进行投影 ,并通过 REST 进行通信。

这里没有实现任何有趣的业务逻辑,也没有编写测试,因为时间不够。
事件列表也简化了:创建新的银行账户、更改电子邮件或地址以及存款。
当然,在现实世界中最好使用更具体、更有意义的事件,但这里的目标是展示思路及其工作原理。

所有用户界面都将通过以下端口提供:

Swagger UI:http://localhost:8006/swagger-ui/index.html

昂首阔步

Jaeger UI:http://localhost:16686

杰格

Prometheus 用户界面:http://localhost:9090

普罗米修斯

Grafana 用户界面:http://localhost:3005

格拉法纳

此项目的 Docker Compose 文件:



version: "3.9"

services:
  postgresql:
    image: postgres:14.2
    container_name: postgresql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=microservices
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  zookeeper:
    image: 'bitnami/zookeeper:3.8.0'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - "./zookeeper:/zookeeper"
    networks: [ "microservices" ]

  kafka:
    image: 'bitnami/kafka:3.0.1'
    ports:
      - "9092:9092"
      - "9093:9093"
    volumes:
      - "./kafka_data:/bitnami"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
    networks: [ "microservices" ]

  redis:
    image: redis:6-alpine
    restart: always
    container_name: microservices_redis
    ports:
      - "6379:6379"
    networks: [ "microservices" ]

  mongodb:
    image: docker.io/bitnami/mongodb:4.4
    restart: always
    container_name: microservices_mongo
    environment:
      MONGODB_ROOT_USER: admin
      MONGODB_ROOT_PASSWORD: admin
      BITNAMI_DEBUG: "false"
      ALLOW_EMPTY_PASSWORD: "no"
      MONGODB_SYSTEM_LOG_VERBOSITY: "0"
      MONGODB_DISABLE_SYSTEM_LOG: "no"
      MONGODB_DISABLE_JAVASCRIPT: "no"
      MONGODB_ENABLE_JOURNAL: "yes"
      MONGODB_ENABLE_IPV6: "no"
      MONGODB_ENABLE_DIRECTORY_PER_DB: "no"
      MONGODB_DATABASE: "microservices"
    volumes:
      - ./mongodb_data_container:/data/db
    ports:
      - "27017:27017"
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

networks:
  microservices:
    name: microservices


Enter fullscreen mode Exit fullscreen mode

AggregateRoot 可以通过多种方式实现,其主要方法是加载事件——应用和引发变更。
当我们从数据库中获取聚合时,
我们并非将其状态作为表或文档中的一条记录读取,而是读取之前保存的所有事件,并对每个事件调用when方法。
完成这些步骤后,我们将恢复给定聚合的所有历史记录。
通过这种方式,我们将聚合更新到其最新状态。



@Data
@NoArgsConstructor
public abstract class AggregateRoot {

    protected String id;
    protected String type;
    protected long version;
    protected final List<Event> changes = new ArrayList<>();

    public AggregateRoot(final String id, final String aggregateType) {
        this.id = id;
        this.type = aggregateType;
    }


    public abstract void when(final Event event);

    public void load(final List<Event> events) {
        events.forEach(event -> {
            this.validateEvent(event);
            this.raiseEvent(event);
            this.version++;
        });
    }

    public void apply(final Event event) {
        this.validateEvent(event);
        event.setAggregateType(this.type);

        when(event);
        changes.add(event);

        this.version++;
        event.setVersion(this.version);
    }

    public void raiseEvent(final Event event) {
        this.validateEvent(event);

        event.setAggregateType(this.type);
        when(event);

        this.version++;
    }

    public void clearChanges() {
        this.changes.clear();
    }

    private void validateEvent(final Event event) {
        if (Objects.isNull(event) || !event.getAggregateId().equals(this.id))
            throw new InvalidEventException(event.toString());
    }
}


Enter fullscreen mode Exit fullscreen mode

事件代表领域内发生的事实。它们是真理的来源;您当前的状态源自于这些事件。
事件是不可变的,代表着业务事实。
这意味着我们永远不会更改或删除数据库中的任何内容,我们只会添加新的事件。



@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Event {
    private UUID id;
    private String aggregateId;
    private String eventType;
    private String aggregateType;
    private long version;
    private byte[] data;
    private byte[] metaData;
    private LocalDateTime timeStamp;


    public Event(String eventType, String aggregateType) {
        this.id = UUID.randomUUID();
        this.eventType = eventType;
        this.aggregateType = aggregateType;
        this.timeStamp = LocalDateTime.now();
    }
}


Enter fullscreen mode Exit fullscreen mode

如果我们严格按照事件溯源模式来操作,就需要获取所有这些交易记录来计算当前账户的余额。
这样做效率很低。为了提高效率,你首先想到的可能是将最新状态缓存起来。
与其检索所有这些事件,我们只需检索一条记录并将其用于业务逻辑即可。这就是快照
在这个微服务中,我们每隔 N 次从聚合中创建一个快照并将其保存到 PostgreSQL 中。
当我们从事件存储加载聚合时,首先加载快照,然后加载版本号大于快照的事件。



@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Snapshot {

    private UUID id;
    private String aggregateId;
    private String aggregateType;
    private byte[] data;
    private byte[] metaData;
    private long version;
    private LocalDateTime timeStamp;
}


Enter fullscreen mode Exit fullscreen mode

AggregateStore 的实现包括 Load、Save 和 Exists 方法。Load
和 Save 方法接受聚合对象,然后使用 EventStoreDB 客户端加载或应用事件。Load
方法查找聚合对象的流名称,读取该聚合流中的所有事件,
遍历所有事件,并为每个事件调用 RaiseEvent 处理程序。Save
方法通过保存更改历史记录来持久化聚合对象,并处理并发情况。
当您从 EventStoreDB 检索流时,会记录当前版本号,
然后在保存时,可以确定在此期间是否有人修改了该记录。PostgreSQL
事件存储的实现如下:



@Repository
@RequiredArgsConstructor
@Slf4j
public class EventStore implements EventStoreDB {

    public static final int SNAPSHOT_FREQUENCY = 3;
    private static final String SAVE_EVENTS_QUERY = "INSERT INTO events (aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp) values (:aggregate_id, :aggregate_type, :event_type, :data, :metadata, :version, now())";
    private static final String LOAD_EVENTS_QUERY = "SELECT event_id ,aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp FROM events e WHERE e.aggregate_id = :aggregate_id AND e.version > :version ORDER BY e.version ASC";
    private static final String SAVE_SNAPSHOT_QUERY = "INSERT INTO snapshots (aggregate_id, aggregate_type, data, metadata, version, timestamp) VALUES (:aggregate_id, :aggregate_type, :data, :metadata, :version, now()) ON CONFLICT (aggregate_id) DO UPDATE SET data = :data, version = :version, timestamp = now()";
    private static final String HANDLE_CONCURRENCY_QUERY = "SELECT aggregate_id FROM events e WHERE e.aggregate_id = :aggregate_id LIMIT 1 FOR UPDATE";
    private static final String LOAD_SNAPSHOT_QUERY = "SELECT aggregate_id, aggregate_type, data, metadata, version, timestamp FROM snapshots s WHERE s.aggregate_id = :aggregate_id";
    private static final String EXISTS_QUERY = "SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id";

    private final NamedParameterJdbcTemplate jdbcTemplate;
    private final EventBus eventBus;


    @Override
    @Transactional
    @NewSpan
    public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
        final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());

        if (aggregate.getVersion() > 1) {
            this.handleConcurrency(aggregate.getId());
        }

        this.saveEvents(aggregate.getChanges());
        if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) {
            this.saveSnapshot(aggregate);
        }

        eventBus.publish(aggregateEvents);

        log.info("(save) saved aggregate: {}", aggregate);
    }

    @Override
    @Transactional(readOnly = true)
    @NewSpan
    public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {

        final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);

        final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType);

        final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
        events.forEach(event -> {
            aggregate.raiseEvent(event);
            log.info("raise event version: {}", event.getVersion());
        });

        if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId);

        log.info("(load) loaded aggregate: {}", aggregate);
        return aggregate;
    }

    @Override
    @NewSpan
    public void saveEvents(@SpanTag("events") List<Event> events) {
        if (events.isEmpty()) return;

        final List<Event> changes = new ArrayList<>(events);
        if (changes.size() > 1) {
            this.eventsBatchInsert(changes);
            return;
        }

        final Event event = changes.get(0);
        int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, mapFromEvent(event));
        log.info("(saveEvents) saved result: {}, event: {}", result, event);
    }

    private Map<String, Serializable> mapFromEvent(Event event) {
        return Map.of(
                AGGREGATE_ID, event.getAggregateId(),
                AGGREGATE_TYPE, event.getAggregateType(),
                EVENT_TYPE, event.getEventType(),
                DATA, Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
                METADATA, Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
                VERSION, event.getVersion());
    }


    @NewSpan
    private void eventsBatchInsert(@SpanTag("events") List<Event> events) {
        final var args = events.stream().map(this::mapFromEvent).toList();
        final Map<String, ?>[] maps = args.toArray(new Map[0]);
        int[] ints = jdbcTemplate.batchUpdate(SAVE_EVENTS_QUERY, maps);
        log.info("(saveEvents) BATCH saved result: {}, event: {}", ints);
    }

    @Override
    @NewSpan
    public List<Event> loadEvents(@SpanTag("aggregateId") String aggregateId, @SpanTag("version") long version) {
        return jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of(AGGREGATE_ID, aggregateId, VERSION, version),
                (rs, rowNum) -> Event.builder()
                        .aggregateId(rs.getString(AGGREGATE_ID))
                        .aggregateType(rs.getString(AGGREGATE_TYPE))
                        .eventType(rs.getString(EVENT_TYPE))
                        .data(rs.getBytes(DATA))
                        .metaData(rs.getBytes(METADATA))
                        .version(rs.getLong(VERSION))
                        .timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime())
                        .build());
    }

    @NewSpan
    private <T extends AggregateRoot> void saveSnapshot(@SpanTag("aggregate") T aggregate) {
        aggregate.toSnapshot();
        final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);

        int updateResult = jdbcTemplate.update(SAVE_SNAPSHOT_QUERY,
                Map.of(AGGREGATE_ID, snapshot.getAggregateId(),
                        AGGREGATE_TYPE, snapshot.getAggregateType(),
                        DATA, Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(),
                        METADATA, Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(),
                        VERSION, snapshot.getVersion()));

        log.info("(saveSnapshot) updateResult: {}", updateResult);
    }


    @NewSpan
    private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
        try {
            String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class);
            log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
        } catch (EmptyResultDataAccessException e) {
            log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage());
        }
        log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId);
    }

    @NewSpan
    private Optional<Snapshot> loadSnapshot(@SpanTag("aggregateId") String aggregateId) {
        return jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of(AGGREGATE_ID, aggregateId), (rs, rowNum) -> Snapshot.builder()
                .aggregateId(rs.getString(AGGREGATE_ID))
                .aggregateType(rs.getString(AGGREGATE_TYPE))
                .data(rs.getBytes(DATA))
                .metaData(rs.getBytes(METADATA))
                .version(rs.getLong(VERSION))
                .timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime())
                .build()).stream().findFirst();
    }

    @NewSpan
    private <T extends AggregateRoot> T getAggregate(@SpanTag("aggregateId") final String aggregateId, @SpanTag("aggregateType") final Class<T> aggregateType) {
        try {
            return aggregateType.getConstructor(String.class).newInstance(aggregateId);
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    @NewSpan
    private <T extends AggregateRoot> T getSnapshotFromClass(@SpanTag("snapshot") Optional<Snapshot> snapshot, @SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
        if (snapshot.isEmpty()) {
            final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
            return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType);
        }
        return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType);
    }


    @Override
    @NewSpan
    public Boolean exists(@SpanTag("aggregateId") String aggregateId) {
        try {
            final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class);
            log.info("aggregate exists id: {}", id);
            return true;
        } catch (Exception ex) {
            if (!(ex instanceof EmptyResultDataAccessException)) {
                throw new RuntimeException(ex);
            }
            return false;
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

接下来,我们来创建一个银行账户聚合:



@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class BankAccountAggregate extends AggregateRoot {


    public static final String AGGREGATE_TYPE = "BankAccountAggregate";

    public BankAccountAggregate(String id) {
        super(id, AGGREGATE_TYPE);
    }

    private String email;
    private String userName;
    private String address;
    private BigDecimal balance;


    @Override
    public void when(Event event) {
        switch (event.getEventType()) {
            case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class));
            case EmailChangedEvent.EMAIL_CHANGED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class));
            case AddressUpdatedEvent.ADDRESS_UPDATED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
            case BalanceDepositedEvent.BALANCE_DEPOSITED ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
            default -> throw new InvalidEventTypeException(event.getEventType());
        }
    }

    private void handle(final BankAccountCreatedEvent event) {
        this.email = event.getEmail();
        this.userName = event.getUserName();
        this.address = event.getAddress();
        this.balance = BigDecimal.valueOf(0);
    }

    private void handle(final EmailChangedEvent event) {
        Objects.requireNonNull(event.getNewEmail());
        if (event.getNewEmail().isBlank()) throw new InvalidEmailException();
        this.email = event.getNewEmail();
    }

    private void handle(final AddressUpdatedEvent event) {
        Objects.requireNonNull(event.getNewAddress());
        if (event.getNewAddress().isBlank()) throw new InvalidAddressException();
        this.address = event.getNewAddress();
    }

    private void handle(final BalanceDepositedEvent event) {
        Objects.requireNonNull(event.getAmount());
        this.balance = this.balance.add(event.getAmount());
    }

    public void createBankAccount(String email, String address, String userName) {
        final var data = BankAccountCreatedEvent.builder()
                .aggregateId(id)
                .email(email)
                .address(address)
                .userName(userName)
                .build();

        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1, dataBytes, null);
        this.apply(event);
    }

    public void changeEmail(String email) {
        final var data = EmailChangedEvent.builder().aggregateId(id).newEmail(email).build();
        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(EmailChangedEvent.EMAIL_CHANGED_V1, dataBytes, null);
        apply(event);

    }

    public void changeAddress(String newAddress) {
        final var data = AddressUpdatedEvent.builder().aggregateId(id).newAddress(newAddress).build();
        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(AddressUpdatedEvent.ADDRESS_UPDATED_V1, dataBytes, null);
        apply(event);
    }

    public void depositBalance(BigDecimal amount) {
        final var data = BalanceDepositedEvent.builder().aggregateId(id).amount(amount).build();
        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(BalanceDepositedEvent.BALANCE_DEPOSITED, dataBytes, null);
        apply(event);
    }
}


Enter fullscreen mode Exit fullscreen mode

我们的微服务接受 HTTP 请求: Swagger 使用了Swagger OpenAPI 3。银行 账户 REST 控制器接受请求,使用Hibernate Validator进行验证, 然后调用命令或查询服务。CQRS 之所以流行,主要原因在于它能够分别处理读取和写入操作,因为 这两种操作的优化技术差异巨大。
昂首阔步






@RestController
@RequestMapping(path = "/api/v1/bank")
@Slf4j
@RequiredArgsConstructor
public class BankAccountController {

    private final BankAccountCommandService commandService;
    private final BankAccountQueryService queryService;

    @GetMapping("{aggregateId}")
    public ResponseEntity<BankAccountResponseDTO> getBankAccount(@PathVariable String aggregateId) {
        final var result = queryService.handle(new GetBankAccountByIDQuery(aggregateId));
        log.info("GET bank account result: {}", result);
        return ResponseEntity.ok(result);
    }

    @PostMapping
    public ResponseEntity<String> createBankAccount(@Valid @RequestBody CreateBankAccountRequestDTO dto) {
        final var aggregateID = UUID.randomUUID().toString();
        final var id = commandService.handle(new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address()));
        log.info("CREATE bank account id: {}", id);
        return ResponseEntity.status(HttpStatus.CREATED).body(id);
    }

    @PostMapping(path = "/deposit/{aggregateId}")
    public ResponseEntity<Void> depositAmount(@Valid @RequestBody DepositAmountRequestDTO dto, @PathVariable String aggregateId) {
        commandService.handle(new DepositAmountCommand(aggregateId, dto.amount()));
        return ResponseEntity.ok().build();
    }

    @PostMapping(path = "/email/{aggregateId}")
    public ResponseEntity<Void> changeEmail(@Valid @RequestBody ChangeEmailRequestDTO dto, @PathVariable String aggregateId) {
        commandService.handle(new ChangeEmailCommand(aggregateId, dto.newEmail()));
        return ResponseEntity.ok().build();
    }

    @PostMapping(path = "/address/{aggregateId}")
    public ResponseEntity<Void> changeAddress(@Valid @RequestBody ChangeAddressRequestDTO dto, @PathVariable String aggregateId) {
        commandService.handle(new ChangeAddressCommand(aggregateId, dto.newAddress()));
        return ResponseEntity.ok().build();
    }

    @GetMapping("/balance")
    public ResponseEntity<Page<BankAccountResponseDTO>> getAllOrderByBalance(@RequestParam(name = "page", defaultValue = "0") Integer page,
                                                                             @RequestParam(name = "size", defaultValue = "10") Integer size) {

        final var result = queryService.handle(new FindAllOrderByBalance(page, size));
        log.info("GET all by balance result: {}", result);
        return ResponseEntity.ok(result);
    }
}


Enter fullscreen mode Exit fullscreen mode

命令服务处理 CQRS 命令,从事件存储加载聚合,并根据应用程序的业务逻辑调用其方法。
聚合应用这些更改,然后将这些事件更改列表保存到事件存储中。
为了实现微服务的弹性,这里使用了Resilience4j
它可以在属性文件中进行配置。这是一个非常实用的功能,因为在实际应用中,经常需要对 Kubernetes YAML 文件进行一些更改。



@RequiredArgsConstructor
@Slf4j
@Service
public class BankAccountCommandHandler implements BankAccountCommandService {

    private final EventStoreDB eventStoreDB;
    private static final String SERVICE_NAME = "microservice";

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public String handle(@SpanTag("command") CreateBankAccountCommand command) {
        final var aggregate = new BankAccountAggregate(command.aggregateID());
        aggregate.createBankAccount(command.email(), command.address(), command.userName());
        eventStoreDB.save(aggregate);

        log.info("(CreateBankAccountCommand) aggregate: {}", aggregate);
        return aggregate.getId();
    }

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public void handle(@SpanTag("command") ChangeEmailCommand command) {
        final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
        aggregate.changeEmail(command.newEmail());
        eventStoreDB.save(aggregate);
        log.info("(ChangeEmailCommand) aggregate: {}", aggregate);
    }

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public void handle(@SpanTag("command") ChangeAddressCommand command) {
        final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
        aggregate.changeAddress(command.newAddress());
        eventStoreDB.save(aggregate);
        log.info("(ChangeAddressCommand) aggregate: {}", aggregate);
    }

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public void handle(@SpanTag("command") DepositAmountCommand command) {
        final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
        aggregate.depositBalance(command.amount());
        eventStoreDB.save(aggregate);
        log.info("(DepositAmountCommand) aggregate: {}", aggregate);
    }
}


Enter fullscreen mode Exit fullscreen mode

杰格
从事件构建状态的过程称为投影。
我们可以订阅顺序类型的流事件来创建投影。
当我们执行命令时,聚合会生成一个新事件,该事件表示聚合的状态转换。
这些事件会被提交到存储中,因此存储会将它们追加到聚合流的末尾。
投影接收这些事件,并使用 `When` 方法更新其读取模型。与聚合类似,投影会根据事件类型应用相应的更改:



@Service
@Slf4j
@RequiredArgsConstructor
public class BankAccountMongoProjection implements Projection {

    private final BankAccountMongoRepository mongoRepository;
    private final EventStoreDB eventStoreDB;
    private static final String SERVICE_NAME = "microservice";


    @KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
            groupId = "${microservice.kafka.groupId}",
            concurrency = "${microservice.kafka.default-concurrency}")
    public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) {
        log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}, data: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), new String(data));

        try {
            final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data);
            this.processEvents(Arrays.stream(events).toList());
            ack.acknowledge();
            log.info("ack events: {}", Arrays.toString(events));
        } catch (Exception ex) {
            ack.nack(100);
            log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), ex);
        }
    }

    @NewSpan
    private void processEvents(@SpanTag("events") List<Event> events) {
        if (events.isEmpty()) return;

        try {
            events.forEach(this::when);
        } catch (Exception ex) {
            mongoRepository.deleteByAggregateId(events.get(0).getAggregateId());
            final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class);
            final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate);
            final var result = mongoRepository.save(document);
            log.info("(processEvents) saved document: {}", result);
        }
    }

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public void when(@SpanTag("event") Event event) {
        final var aggregateId = event.getAggregateId();
        log.info("(when) >>>>> aggregateId: {}", aggregateId);

        switch (event.getEventType()) {
            case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class));
            case EmailChangedEvent.EMAIL_CHANGED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class));
            case AddressUpdatedEvent.ADDRESS_UPDATED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
            case BalanceDepositedEvent.BALANCE_DEPOSITED ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
            default -> log.error("unknown event type: {}", event.getEventType());
        }
    }


    @NewSpan
    private void handle(@SpanTag("event") BankAccountCreatedEvent event) {
        log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId());

        final var document = BankAccountDocument.builder()
                .aggregateId(event.getAggregateId())
                .email(event.getEmail())
                .address(event.getAddress())
                .userName(event.getUserName())
                .balance(BigDecimal.valueOf(0))
                .build();

        final var insert = mongoRepository.insert(document);
        log.info("(BankAccountCreatedEvent) insert: {}", insert);
    }

    @NewSpan
    private void handle(@SpanTag("event") EmailChangedEvent event) {
        log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
        final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
        if (documentOptional.isEmpty())
            throw new BankAccountDocumentNotFoundException(event.getAggregateId());

        final var document = documentOptional.get();
        document.setEmail(event.getNewEmail());
        mongoRepository.save(document);
    }

    @NewSpan
    private void handle(@SpanTag("event") AddressUpdatedEvent event) {
        log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
        final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
        if (documentOptional.isEmpty())
            throw new BankAccountDocumentNotFoundException(event.getAggregateId());

        final var document = documentOptional.get();
        document.setAddress(event.getNewAddress());
        mongoRepository.save(document);
    }

    @NewSpan
    private void handle(@SpanTag("event") BalanceDepositedEvent event) {
        log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
        final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
        if (documentOptional.isEmpty())
            throw new BankAccountDocumentNotFoundException(event.getAggregateId());

        final var document = documentOptional.get();
        final var newBalance = document.getBalance().add(event.getAmount());
        document.setBalance(newBalance);
        mongoRepository.save(document);
    }
}


Enter fullscreen mode Exit fullscreen mode

然后我们可以使用查询服务检索投影数据:



@Slf4j
@RequiredArgsConstructor
@Service
public class BankAccountQueryHandler implements BankAccountQueryService {

    private final EventStoreDB eventStoreDB;
    private final BankAccountMongoRepository mongoRepository;
    private static final String SERVICE_NAME = "microservice";

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public BankAccountResponseDTO handle(@SpanTag("query") GetBankAccountByIDQuery query) {
        Optional<BankAccountDocument> optionalDocument = mongoRepository.findByAggregateId(query.aggregateID());
        if (optionalDocument.isPresent()) {
            return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get());
        }

        final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class);
        final var savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate));
        log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument);

        final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate);
        log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO);
        return bankAccountResponseDTO;
    }

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public Page<BankAccountResponseDTO> handle(@SpanTag("query") FindAllOrderByBalance query) {
        return mongoRepository.findAll(PageRequest.of(query.page(), query.size(), Sort.by("balance")))
                .map(BankAccountMapper::bankAccountResponseDTOFromDocument);
    }
}


Enter fullscreen mode Exit fullscreen mode

您可以在这里找到更多详细信息和完整项目的源代码。当然
在实际应用中,我们还需要实现更多必要的功能,例如 Kubernetes 健康检查、速率限制器等等。
根据项目的不同,这些功能的实现方式也会有所不同,例如,您可以使用 Kubernetes 和 Istio 来实现某些功能。
希望这篇文章对您有所帮助,欢迎您提出任何反馈或问题,请随时通过电子邮件或任何即时通讯工具与我联系 :)

文章来源:https://dev.to/aleksk1ng/java-spring-eventsource-and-cqrs-clean-architecture-microservice-4e2d