Java Spring EventSourcing 和 CQRS Clean Architecture 微服务 👋⚡️💫
本文将尝试使用以下方法创建更接近真实世界的事件溯源和 CQRS 微服务:🚀👨💻🙌
👨💻 已使用物品清单:
- Spring - Java Spring
- Spring Data JPA - 数据访问层
- Spring Data MongoDB - Spring Data MongoDB
- Spring Cloud Sleuth - Spring Cloud Sleuth 分布式追踪
- Kafka - Spring for Apache Kafka
- PostgreSQL - PostgreSQL 数据库。
- Jaeger ——Jaeger是一个分布式追踪系统
- Docker - Docker
- 普罗米修斯- 普罗米修斯
- Grafana - Grafana
- Flyway - 数据库迁移
- Resilience4j - Resilience4j 是一个轻量级、易于使用的容错机制
- Swagger OpenAPI 3是一个 Java 库,可帮助自动生成 API 文档。
源代码可在GitHub 仓库
中找到 。本项目的主要思路是使用 Java、Spring 和基于 PostgreSQL 的 EventStore 实现事件溯源和 CQRS。
之前我写过一篇类似的文章,其中使用Go 和 EventStoreDB实现了相同的功能。
我认为 EventStoreDB 是事件溯源的最佳选择,但在实际项目中,我们通常会受到一些业务限制,例如
,可能不允许使用 EventStoreDB。在这种情况下,我认为 PostgreSQL 和 Kafka 是实现自定义事件存储的不错替代方案。
本文并未详细介绍事件溯源和 CQRS 模式,最佳阅读资源是microservices.io,该博客和本文
的文档也非常出色。 正如之前的文章所述,我强烈推荐Alexey Zimarev 的著作《.NET Core 领域驱动设计实战》及其博客。
在这个项目中,我们使用PostgreSQL、Spring Data JPA
和Kafka实现了微服务事件存储, 使用Spring Data MongoDB进行投影 ,并通过 REST 进行通信。
这里没有实现任何有趣的业务逻辑,也没有编写测试,因为时间不够。
事件列表也简化了:创建新的银行账户、更改电子邮件或地址以及存款。
当然,在现实世界中最好使用更具体、更有意义的事件,但这里的目标是展示思路及其工作原理。
所有用户界面都将通过以下端口提供:
Swagger UI:http://localhost:8006/swagger-ui/index.html
Jaeger UI:http://localhost:16686
Prometheus 用户界面:http://localhost:9090
Grafana 用户界面:http://localhost:3005
此项目的 Docker Compose 文件:
version: "3.9"
services:
postgresql:
image: postgres:14.2
container_name: postgresql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=microservices
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./pgdata:/var/lib/postgresql/data
networks: [ "microservices" ]
zookeeper:
image: 'bitnami/zookeeper:3.8.0'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
volumes:
- "./zookeeper:/zookeeper"
networks: [ "microservices" ]
kafka:
image: 'bitnami/kafka:3.0.1'
ports:
- "9092:9092"
- "9093:9093"
volumes:
- "./kafka_data:/bitnami"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
networks: [ "microservices" ]
redis:
image: redis:6-alpine
restart: always
container_name: microservices_redis
ports:
- "6379:6379"
networks: [ "microservices" ]
mongodb:
image: docker.io/bitnami/mongodb:4.4
restart: always
container_name: microservices_mongo
environment:
MONGODB_ROOT_USER: admin
MONGODB_ROOT_PASSWORD: admin
BITNAMI_DEBUG: "false"
ALLOW_EMPTY_PASSWORD: "no"
MONGODB_SYSTEM_LOG_VERBOSITY: "0"
MONGODB_DISABLE_SYSTEM_LOG: "no"
MONGODB_DISABLE_JAVASCRIPT: "no"
MONGODB_ENABLE_JOURNAL: "yes"
MONGODB_ENABLE_IPV6: "no"
MONGODB_ENABLE_DIRECTORY_PER_DB: "no"
MONGODB_DATABASE: "microservices"
volumes:
- ./mongodb_data_container:/data/db
ports:
- "27017:27017"
networks: [ "microservices" ]
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3005:3000'
networks: [ "microservices" ]
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "14250:14250"
- "9411:9411"
networks: [ "microservices" ]
networks:
microservices:
name: microservices
AggregateRoot 可以通过多种方式实现,其主要方法是加载事件——应用和引发变更。
当我们从数据库中获取聚合时,
我们并非将其状态作为表或文档中的一条记录读取,而是读取之前保存的所有事件,并对每个事件调用when方法。
完成这些步骤后,我们将恢复给定聚合的所有历史记录。
通过这种方式,我们将聚合更新到其最新状态。
@Data
@NoArgsConstructor
public abstract class AggregateRoot {
protected String id;
protected String type;
protected long version;
protected final List<Event> changes = new ArrayList<>();
public AggregateRoot(final String id, final String aggregateType) {
this.id = id;
this.type = aggregateType;
}
public abstract void when(final Event event);
public void load(final List<Event> events) {
events.forEach(event -> {
this.validateEvent(event);
this.raiseEvent(event);
this.version++;
});
}
public void apply(final Event event) {
this.validateEvent(event);
event.setAggregateType(this.type);
when(event);
changes.add(event);
this.version++;
event.setVersion(this.version);
}
public void raiseEvent(final Event event) {
this.validateEvent(event);
event.setAggregateType(this.type);
when(event);
this.version++;
}
public void clearChanges() {
this.changes.clear();
}
private void validateEvent(final Event event) {
if (Objects.isNull(event) || !event.getAggregateId().equals(this.id))
throw new InvalidEventException(event.toString());
}
}
事件代表领域内发生的事实。它们是真理的来源;您当前的状态源自于这些事件。
事件是不可变的,代表着业务事实。
这意味着我们永远不会更改或删除数据库中的任何内容,我们只会添加新的事件。
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Event {
private UUID id;
private String aggregateId;
private String eventType;
private String aggregateType;
private long version;
private byte[] data;
private byte[] metaData;
private LocalDateTime timeStamp;
public Event(String eventType, String aggregateType) {
this.id = UUID.randomUUID();
this.eventType = eventType;
this.aggregateType = aggregateType;
this.timeStamp = LocalDateTime.now();
}
}
如果我们严格按照事件溯源模式来操作,就需要获取所有这些交易记录来计算当前账户的余额。
这样做效率很低。为了提高效率,你首先想到的可能是将最新状态缓存起来。
与其检索所有这些事件,我们只需检索一条记录并将其用于业务逻辑即可。这就是快照。
在这个微服务中,我们每隔 N 次从聚合中创建一个快照并将其保存到 PostgreSQL 中。
当我们从事件存储加载聚合时,首先加载快照,然后加载版本号大于快照的事件。
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Snapshot {
private UUID id;
private String aggregateId;
private String aggregateType;
private byte[] data;
private byte[] metaData;
private long version;
private LocalDateTime timeStamp;
}
AggregateStore 的实现包括 Load、Save 和 Exists 方法。Load
和 Save 方法接受聚合对象,然后使用 EventStoreDB 客户端加载或应用事件。Load
方法会查找聚合对象的流名称,读取该聚合流中的所有事件,
遍历所有事件,并为每个事件调用 RaiseEvent 处理程序。Save
方法通过保存更改历史记录来持久化聚合对象,并处理并发情况。
当您从 EventStoreDB 检索流时,会记录当前版本号,
然后在保存时,可以确定在此期间是否有人修改了该记录。PostgreSQL
事件存储的实现如下:
@Repository
@RequiredArgsConstructor
@Slf4j
public class EventStore implements EventStoreDB {
public static final int SNAPSHOT_FREQUENCY = 3;
private static final String SAVE_EVENTS_QUERY = "INSERT INTO events (aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp) values (:aggregate_id, :aggregate_type, :event_type, :data, :metadata, :version, now())";
private static final String LOAD_EVENTS_QUERY = "SELECT event_id ,aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp FROM events e WHERE e.aggregate_id = :aggregate_id AND e.version > :version ORDER BY e.version ASC";
private static final String SAVE_SNAPSHOT_QUERY = "INSERT INTO snapshots (aggregate_id, aggregate_type, data, metadata, version, timestamp) VALUES (:aggregate_id, :aggregate_type, :data, :metadata, :version, now()) ON CONFLICT (aggregate_id) DO UPDATE SET data = :data, version = :version, timestamp = now()";
private static final String HANDLE_CONCURRENCY_QUERY = "SELECT aggregate_id FROM events e WHERE e.aggregate_id = :aggregate_id LIMIT 1 FOR UPDATE";
private static final String LOAD_SNAPSHOT_QUERY = "SELECT aggregate_id, aggregate_type, data, metadata, version, timestamp FROM snapshots s WHERE s.aggregate_id = :aggregate_id";
private static final String EXISTS_QUERY = "SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id";
private final NamedParameterJdbcTemplate jdbcTemplate;
private final EventBus eventBus;
@Override
@Transactional
@NewSpan
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
if (aggregate.getVersion() > 1) {
this.handleConcurrency(aggregate.getId());
}
this.saveEvents(aggregate.getChanges());
if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) {
this.saveSnapshot(aggregate);
}
eventBus.publish(aggregateEvents);
log.info("(save) saved aggregate: {}", aggregate);
}
@Override
@Transactional(readOnly = true)
@NewSpan
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType);
final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
events.forEach(event -> {
aggregate.raiseEvent(event);
log.info("raise event version: {}", event.getVersion());
});
if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId);
log.info("(load) loaded aggregate: {}", aggregate);
return aggregate;
}
@Override
@NewSpan
public void saveEvents(@SpanTag("events") List<Event> events) {
if (events.isEmpty()) return;
final List<Event> changes = new ArrayList<>(events);
if (changes.size() > 1) {
this.eventsBatchInsert(changes);
return;
}
final Event event = changes.get(0);
int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, mapFromEvent(event));
log.info("(saveEvents) saved result: {}, event: {}", result, event);
}
private Map<String, Serializable> mapFromEvent(Event event) {
return Map.of(
AGGREGATE_ID, event.getAggregateId(),
AGGREGATE_TYPE, event.getAggregateType(),
EVENT_TYPE, event.getEventType(),
DATA, Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
METADATA, Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
VERSION, event.getVersion());
}
@NewSpan
private void eventsBatchInsert(@SpanTag("events") List<Event> events) {
final var args = events.stream().map(this::mapFromEvent).toList();
final Map<String, ?>[] maps = args.toArray(new Map[0]);
int[] ints = jdbcTemplate.batchUpdate(SAVE_EVENTS_QUERY, maps);
log.info("(saveEvents) BATCH saved result: {}, event: {}", ints);
}
@Override
@NewSpan
public List<Event> loadEvents(@SpanTag("aggregateId") String aggregateId, @SpanTag("version") long version) {
return jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of(AGGREGATE_ID, aggregateId, VERSION, version),
(rs, rowNum) -> Event.builder()
.aggregateId(rs.getString(AGGREGATE_ID))
.aggregateType(rs.getString(AGGREGATE_TYPE))
.eventType(rs.getString(EVENT_TYPE))
.data(rs.getBytes(DATA))
.metaData(rs.getBytes(METADATA))
.version(rs.getLong(VERSION))
.timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime())
.build());
}
@NewSpan
private <T extends AggregateRoot> void saveSnapshot(@SpanTag("aggregate") T aggregate) {
aggregate.toSnapshot();
final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);
int updateResult = jdbcTemplate.update(SAVE_SNAPSHOT_QUERY,
Map.of(AGGREGATE_ID, snapshot.getAggregateId(),
AGGREGATE_TYPE, snapshot.getAggregateType(),
DATA, Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(),
METADATA, Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(),
VERSION, snapshot.getVersion()));
log.info("(saveSnapshot) updateResult: {}", updateResult);
}
@NewSpan
private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
try {
String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class);
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
} catch (EmptyResultDataAccessException e) {
log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage());
}
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId);
}
@NewSpan
private Optional<Snapshot> loadSnapshot(@SpanTag("aggregateId") String aggregateId) {
return jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of(AGGREGATE_ID, aggregateId), (rs, rowNum) -> Snapshot.builder()
.aggregateId(rs.getString(AGGREGATE_ID))
.aggregateType(rs.getString(AGGREGATE_TYPE))
.data(rs.getBytes(DATA))
.metaData(rs.getBytes(METADATA))
.version(rs.getLong(VERSION))
.timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime())
.build()).stream().findFirst();
}
@NewSpan
private <T extends AggregateRoot> T getAggregate(@SpanTag("aggregateId") final String aggregateId, @SpanTag("aggregateType") final Class<T> aggregateType) {
try {
return aggregateType.getConstructor(String.class).newInstance(aggregateId);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@NewSpan
private <T extends AggregateRoot> T getSnapshotFromClass(@SpanTag("snapshot") Optional<Snapshot> snapshot, @SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
if (snapshot.isEmpty()) {
final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType);
}
return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType);
}
@Override
@NewSpan
public Boolean exists(@SpanTag("aggregateId") String aggregateId) {
try {
final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class);
log.info("aggregate exists id: {}", id);
return true;
} catch (Exception ex) {
if (!(ex instanceof EmptyResultDataAccessException)) {
throw new RuntimeException(ex);
}
return false;
}
}
}
接下来,我们来创建一个银行账户聚合:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class BankAccountAggregate extends AggregateRoot {
public static final String AGGREGATE_TYPE = "BankAccountAggregate";
public BankAccountAggregate(String id) {
super(id, AGGREGATE_TYPE);
}
private String email;
private String userName;
private String address;
private BigDecimal balance;
@Override
public void when(Event event) {
switch (event.getEventType()) {
case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 ->
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class));
case EmailChangedEvent.EMAIL_CHANGED_V1 ->
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class));
case AddressUpdatedEvent.ADDRESS_UPDATED_V1 ->
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
case BalanceDepositedEvent.BALANCE_DEPOSITED ->
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
default -> throw new InvalidEventTypeException(event.getEventType());
}
}
private void handle(final BankAccountCreatedEvent event) {
this.email = event.getEmail();
this.userName = event.getUserName();
this.address = event.getAddress();
this.balance = BigDecimal.valueOf(0);
}
private void handle(final EmailChangedEvent event) {
Objects.requireNonNull(event.getNewEmail());
if (event.getNewEmail().isBlank()) throw new InvalidEmailException();
this.email = event.getNewEmail();
}
private void handle(final AddressUpdatedEvent event) {
Objects.requireNonNull(event.getNewAddress());
if (event.getNewAddress().isBlank()) throw new InvalidAddressException();
this.address = event.getNewAddress();
}
private void handle(final BalanceDepositedEvent event) {
Objects.requireNonNull(event.getAmount());
this.balance = this.balance.add(event.getAmount());
}
public void createBankAccount(String email, String address, String userName) {
final var data = BankAccountCreatedEvent.builder()
.aggregateId(id)
.email(email)
.address(address)
.userName(userName)
.build();
final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
final var event = this.createEvent(BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1, dataBytes, null);
this.apply(event);
}
public void changeEmail(String email) {
final var data = EmailChangedEvent.builder().aggregateId(id).newEmail(email).build();
final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
final var event = this.createEvent(EmailChangedEvent.EMAIL_CHANGED_V1, dataBytes, null);
apply(event);
}
public void changeAddress(String newAddress) {
final var data = AddressUpdatedEvent.builder().aggregateId(id).newAddress(newAddress).build();
final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
final var event = this.createEvent(AddressUpdatedEvent.ADDRESS_UPDATED_V1, dataBytes, null);
apply(event);
}
public void depositBalance(BigDecimal amount) {
final var data = BalanceDepositedEvent.builder().aggregateId(id).amount(amount).build();
final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
final var event = this.createEvent(BalanceDepositedEvent.BALANCE_DEPOSITED, dataBytes, null);
apply(event);
}
}
我们的微服务接受 HTTP 请求: Swagger 使用了Swagger OpenAPI 3。银行 账户 REST 控制器接受请求,使用Hibernate Validator进行验证, 然后调用命令或查询服务。CQRS 之所以流行,主要原因在于它能够分别处理读取和写入操作,因为 这两种操作的优化技术差异巨大。
@RestController
@RequestMapping(path = "/api/v1/bank")
@Slf4j
@RequiredArgsConstructor
public class BankAccountController {
private final BankAccountCommandService commandService;
private final BankAccountQueryService queryService;
@GetMapping("{aggregateId}")
public ResponseEntity<BankAccountResponseDTO> getBankAccount(@PathVariable String aggregateId) {
final var result = queryService.handle(new GetBankAccountByIDQuery(aggregateId));
log.info("GET bank account result: {}", result);
return ResponseEntity.ok(result);
}
@PostMapping
public ResponseEntity<String> createBankAccount(@Valid @RequestBody CreateBankAccountRequestDTO dto) {
final var aggregateID = UUID.randomUUID().toString();
final var id = commandService.handle(new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address()));
log.info("CREATE bank account id: {}", id);
return ResponseEntity.status(HttpStatus.CREATED).body(id);
}
@PostMapping(path = "/deposit/{aggregateId}")
public ResponseEntity<Void> depositAmount(@Valid @RequestBody DepositAmountRequestDTO dto, @PathVariable String aggregateId) {
commandService.handle(new DepositAmountCommand(aggregateId, dto.amount()));
return ResponseEntity.ok().build();
}
@PostMapping(path = "/email/{aggregateId}")
public ResponseEntity<Void> changeEmail(@Valid @RequestBody ChangeEmailRequestDTO dto, @PathVariable String aggregateId) {
commandService.handle(new ChangeEmailCommand(aggregateId, dto.newEmail()));
return ResponseEntity.ok().build();
}
@PostMapping(path = "/address/{aggregateId}")
public ResponseEntity<Void> changeAddress(@Valid @RequestBody ChangeAddressRequestDTO dto, @PathVariable String aggregateId) {
commandService.handle(new ChangeAddressCommand(aggregateId, dto.newAddress()));
return ResponseEntity.ok().build();
}
@GetMapping("/balance")
public ResponseEntity<Page<BankAccountResponseDTO>> getAllOrderByBalance(@RequestParam(name = "page", defaultValue = "0") Integer page,
@RequestParam(name = "size", defaultValue = "10") Integer size) {
final var result = queryService.handle(new FindAllOrderByBalance(page, size));
log.info("GET all by balance result: {}", result);
return ResponseEntity.ok(result);
}
}
命令服务处理 CQRS 命令,从事件存储加载聚合,并根据应用程序的业务逻辑调用其方法。
聚合应用这些更改,然后将这些事件更改列表保存到事件存储中。
为了实现微服务的弹性,这里使用了Resilience4j,
它可以在属性文件中进行配置。这是一个非常实用的功能,因为在实际应用中,经常需要对 Kubernetes YAML 文件进行一些更改。
@RequiredArgsConstructor
@Slf4j
@Service
public class BankAccountCommandHandler implements BankAccountCommandService {
private final EventStoreDB eventStoreDB;
private static final String SERVICE_NAME = "microservice";
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public String handle(@SpanTag("command") CreateBankAccountCommand command) {
final var aggregate = new BankAccountAggregate(command.aggregateID());
aggregate.createBankAccount(command.email(), command.address(), command.userName());
eventStoreDB.save(aggregate);
log.info("(CreateBankAccountCommand) aggregate: {}", aggregate);
return aggregate.getId();
}
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public void handle(@SpanTag("command") ChangeEmailCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.changeEmail(command.newEmail());
eventStoreDB.save(aggregate);
log.info("(ChangeEmailCommand) aggregate: {}", aggregate);
}
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public void handle(@SpanTag("command") ChangeAddressCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.changeAddress(command.newAddress());
eventStoreDB.save(aggregate);
log.info("(ChangeAddressCommand) aggregate: {}", aggregate);
}
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public void handle(@SpanTag("command") DepositAmountCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.depositBalance(command.amount());
eventStoreDB.save(aggregate);
log.info("(DepositAmountCommand) aggregate: {}", aggregate);
}
}

从事件构建状态的过程称为投影。
我们可以订阅顺序类型的流事件来创建投影。
当我们执行命令时,聚合会生成一个新事件,该事件表示聚合的状态转换。
这些事件会被提交到存储中,因此存储会将它们追加到聚合流的末尾。
投影接收这些事件,并使用 `When` 方法更新其读取模型。与聚合类似,投影会根据事件类型应用相应的更改:
@Service
@Slf4j
@RequiredArgsConstructor
public class BankAccountMongoProjection implements Projection {
private final BankAccountMongoRepository mongoRepository;
private final EventStoreDB eventStoreDB;
private static final String SERVICE_NAME = "microservice";
@KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
groupId = "${microservice.kafka.groupId}",
concurrency = "${microservice.kafka.default-concurrency}")
public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) {
log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}, data: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), new String(data));
try {
final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data);
this.processEvents(Arrays.stream(events).toList());
ack.acknowledge();
log.info("ack events: {}", Arrays.toString(events));
} catch (Exception ex) {
ack.nack(100);
log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), ex);
}
}
@NewSpan
private void processEvents(@SpanTag("events") List<Event> events) {
if (events.isEmpty()) return;
try {
events.forEach(this::when);
} catch (Exception ex) {
mongoRepository.deleteByAggregateId(events.get(0).getAggregateId());
final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class);
final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate);
final var result = mongoRepository.save(document);
log.info("(processEvents) saved document: {}", result);
}
}
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public void when(@SpanTag("event") Event event) {
final var aggregateId = event.getAggregateId();
log.info("(when) >>>>> aggregateId: {}", aggregateId);
switch (event.getEventType()) {
case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 ->
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class));
case EmailChangedEvent.EMAIL_CHANGED_V1 ->
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class));
case AddressUpdatedEvent.ADDRESS_UPDATED_V1 ->
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
case BalanceDepositedEvent.BALANCE_DEPOSITED ->
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
default -> log.error("unknown event type: {}", event.getEventType());
}
}
@NewSpan
private void handle(@SpanTag("event") BankAccountCreatedEvent event) {
log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var document = BankAccountDocument.builder()
.aggregateId(event.getAggregateId())
.email(event.getEmail())
.address(event.getAddress())
.userName(event.getUserName())
.balance(BigDecimal.valueOf(0))
.build();
final var insert = mongoRepository.insert(document);
log.info("(BankAccountCreatedEvent) insert: {}", insert);
}
@NewSpan
private void handle(@SpanTag("event") EmailChangedEvent event) {
log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
final var document = documentOptional.get();
document.setEmail(event.getNewEmail());
mongoRepository.save(document);
}
@NewSpan
private void handle(@SpanTag("event") AddressUpdatedEvent event) {
log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
final var document = documentOptional.get();
document.setAddress(event.getNewAddress());
mongoRepository.save(document);
}
@NewSpan
private void handle(@SpanTag("event") BalanceDepositedEvent event) {
log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
final var document = documentOptional.get();
final var newBalance = document.getBalance().add(event.getAmount());
document.setBalance(newBalance);
mongoRepository.save(document);
}
}
然后我们可以使用查询服务检索投影数据:
@Slf4j
@RequiredArgsConstructor
@Service
public class BankAccountQueryHandler implements BankAccountQueryService {
private final EventStoreDB eventStoreDB;
private final BankAccountMongoRepository mongoRepository;
private static final String SERVICE_NAME = "microservice";
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public BankAccountResponseDTO handle(@SpanTag("query") GetBankAccountByIDQuery query) {
Optional<BankAccountDocument> optionalDocument = mongoRepository.findByAggregateId(query.aggregateID());
if (optionalDocument.isPresent()) {
return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get());
}
final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class);
final var savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate));
log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument);
final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate);
log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO);
return bankAccountResponseDTO;
}
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public Page<BankAccountResponseDTO> handle(@SpanTag("query") FindAllOrderByBalance query) {
return mongoRepository.findAll(PageRequest.of(query.page(), query.size(), Sort.by("balance")))
.map(BankAccountMapper::bankAccountResponseDTOFromDocument);
}
}
您可以在这里找到更多详细信息和完整项目的源代码。当然,
在实际应用中,我们还需要实现更多必要的功能,例如 Kubernetes 健康检查、速率限制器等等。
根据项目的不同,这些功能的实现方式也会有所不同,例如,您可以使用 Kubernetes 和 Istio 来实现某些功能。
希望这篇文章对您有所帮助,欢迎您提出任何反馈或问题,请随时通过电子邮件或任何即时通讯工具与我联系 :)



