发布于 2026-01-06 0 阅读
0

使用 Symfony Messenger 实现 CQRS 简介 使用 Symfony Messenger 实现 CQRS 结论

CQRS 与 Symfony Messenger

介绍

使用 Symfony Messenger 实现 CQRS

结论

查看西班牙语版本

介绍

我们通常使用相同的数据结构来写入和查询系统中的信息。对于大型系统而言,这会导致数据结构更加庞大,因为它需要将读取和写入操作集成到同一个模型中。例如,在写入信息时,我们可能需要进行大量的验证以确保持久化信息的正确性;而查询这些信息时,为了检索经过筛选的数据或针对不同情况使用不同的数据结构,查询过程可能既复杂又繁琐。

CQRS 是一种将数据存储的读取操作和更新操作分离的模式。在应用程序中实施 CQRS 可以最大限度地提高其性能、可扩展性和安全性。迁移到 CQRS 所带来的灵活性使系统能够更好地随时间演进,并防止更新命令在域级别引发合并冲突。

模式

CQRS 将读取结构与写入模型分离,读取结构使用查询来读取数据,而写入模型使用命令来对数据进行操作。

  • 命令应该基于任务,这意味着我们需要关注命令的操作。例如,在配送应用中,当用户下单时,我们会将操作命名为 OrderProductCommand,而不是 AddProductToClient 或 CreateNewOrderProduct。这样做也能使我们的应用层更加一致。
  • 查询操作不会修改数据库。查询返回的 DTO 不包含任何领域知识。我们需要关注的是所需信息,而不是领域行为。

好处

  • 独立扩展。它允许读取模型和写入模型独立扩展。
  • 优化的数据模式。读取模型可以使用针对查询优化的模式,而写入模型可以使用针对更新优化的模式。
  • 安全性。更容易确保只有正确的域实体才能执行写入操作。
  • 关注点分离。复杂的业务逻辑放在写入模型中。读取模型可以很简单。

使用 Symfony Messenger 实现 CQRS

消息传递组件帮助应用程序与其他应用程序或通过消息队列收发消息。它还允许我们定义自定义消息总线,包括消息类型和处理程序。

我们来谈谈软件架构。

指令总线

命令总线架构

谈到命令,我们需要构建一个通用接口,以便消息总线能够管理并将其传输给各个处理程序。命令接口最终将成为每个命令的基础接口。每个命令处理程序的实现者都会对给定的命令执行操作,但命令本身并不知道执行的是哪个操作。此外,我们还将为命令总线创建一个接口,以便为我们的消息(命令)创建不同类型的传输器。在本例中,我们创建了一个内存命令总线,但如果需要,我们可以轻松扩展这个概念(例如:队列工作)。

我们的 Symfony 应用程序的源代码中将会出现类似这样的内容。

命令目录组织



<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Command;

interface Command
{
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Command;

interface CommandBus
{
    public function dispatch(Command $command) : void;
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Command;

interface CommandHandler
{
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Bus\Command;

use ...

final class InMemoryCommandBus implements CommandBus
{
    private MessageBus $bus;

    public function __construct(
        iterable $commandHandlers
    ) {
        $this->bus = new MessageBus([
            new HandleMessageMiddleware(
                new HandlersLocator(
                    HandlerBuilder::fromCallables($commandHandlers),
                ),
            ),
        ]);
    }

    /**
     * @throws Throwable
     */
    public function dispatch(Command $command): void
    {
        try {
            $this->bus->dispatch($command);
        } catch (NoHandlerForMessageException $e) {
            throw new InvalidArgumentException(sprintf('The command has not a valid handler: %s', $command::class));
        } catch (HandlerFailedException $e) {
            throw $e->getPrevious();
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

对于内存命令总线类,我们需要注册来自服务定义的每个命令处理器。我们将利用 Symfony 的一项名为“服务标签”的功能,该功能由服务容器和自动配置提供。它允许我们标记服务,以便稍后调用。在 `config/services.yaml` 文件中,我们指示服务容器使用标签标记每个命令处理器接口的实例。internal.command_handler之后,我们声明内存命令总线,并将所有命令处理器的实现者作为可迭代参数传递。命令总线将接收每个命令处理器,并声明预期命令及其对应的处理器。



parameters:

services:
    _defaults:
        autowire: true
        autoconfigure: true

    _instanceof:
        App\Shared\Domain\Bus\Command\CommandHandler:
            tags: ['internal.command_handler']
...
    ### Buses
    App\Shared\Domain\Bus\Command\CommandBus:
        class: App\Shared\Infrastructure\Bus\Command\InMemoryCommandBus
        arguments: [!tagged internal.command_handler]


Enter fullscreen mode Exit fullscreen mode

我们可以创建一个处理程序构建器工具,负责在命令处理程序实现者的 `__invoke` 方法中查找,并将第一个参数类型作为调用该处理程序所需的命令。此时,我们制定了一个约定:每个命令处理程序都必须是可调用的,并且只有一个参数,其类型为命令本身。



<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Bus;

use ...

final class HandlerBuilder
{
    /**
     * @throws ReflectionException
     */
    public static function fromCallables(iterable $callables) : array
    {
        $callablesHandlers = [];

        foreach ($callables as $callable) {
            $envelop = self::extractFirstParam($callable);

            if (! array_key_exists($envelop, $callablesHandlers)) {
                $callablesHandlers[self::extractFirstParam($callable)] = [];
            }

            $callablesHandlers[self::extractFirstParam($callable)][] = $callable;
        }

        return $callablesHandlers;
    }

    /**
     * @throws ReflectionException
     */
    private static function extractFirstParam(object|string $class) : string|null
    {
        $reflection = new ReflectionClass($class);
        $method     = $reflection->getMethod('__invoke');

        if ($method->getNumberOfParameters() === 1) {
            return $method->getParameters()[0]->getClass()?->getName();
        }

        return null;
    }
}


Enter fullscreen mode Exit fullscreen mode

有了这些信息,我们就可以开始构建命令了,例如:

指挥执行组织



<?php

declare(strict_types=1);

namespace App\EmailSender\Application\Create;

use ...

final class CreateEmailCommand implements Command
{
    public function __construct(
        private readonly string $sender,
        private readonly string $addressee,
        private readonly string $message,
    ) {
    }

    public function sender(): string
    {
        return $this->sender;
    }

    public function addressee(): string
    {
        return $this->addressee;
    }

    public function message(): string
    {
        return $this->message;
    }
}


Enter fullscreen mode Exit fullscreen mode

我们可以创建一个创建电子邮件命令,它包含创建新电子邮件所需的信息,但它不知道创建电子邮件所需的具体流程。



<?php

declare(strict_types=1);

namespace App\EmailSender\Application\Create;

use ...

class CreateEmailCommandHandler implements CommandHandler
{
    public function __construct(private EmailRepository $repository)
    {
    }

    public function __invoke(CreateEmailCommand $command) : EmailId {
        $email = Email::createNewEmail(
            sender: new EmailAddress($command->sender()),
            addressee: new EmailAddress($command->addressee()),
            message: new Message($command->message()),
        );

        $this->repository->save($email);

        return $email->id();
    }
}


Enter fullscreen mode Exit fullscreen mode

我们为之前的命令创建了一个处理程序,它知道我们将使用类型为“创建电子邮件命令”的唯一参数调用该对象,并且它知道创建新电子邮件所需的所有过程。



<?php

declare(strict_types=1);

namespace App\EmailSender\Infrastructure\Http;

use ...

class CreateEmailAction
{
    public function __construct(
        private readonly CreateEmailResponder $responder,
        private readonly CommandBus $commandBus,
    ) {
    }

    public function __invoke(Request $request) : Response
    {
        try {
            $this->commandBus->dispatch(
                new CreateEmailCommand(
                    sender: $request->request->get('sender'),
                    addressee: $request->request->get('addressee'),
                    message: $request->request->get('message'),
                ),
            );
        } catch (Exception $e) {
            $this->responder->loadError($e->getMessage());
        }

        return $this->responder->response();
    }
}


Enter fullscreen mode Exit fullscreen mode

因此,我们可以轻松地将命令总线注入到 Action(控制器)类中并分发命令。Action 类本身并不知道应用程序核心发生了什么,但命令总线可以确保我们将命令发送到合适的 Handler,并由 Handler 执行相应的操作。请注意,我们知道将要执行的操作,它由命令名称提供。

查询总线

查询架构

我们来看一下查询总线的模型。我们可以定义一个非常类似的架构,但现在我们需要在查询请求某些内容时返回一个值,这就需要引入响应的概念。响应可以是领域对象的集合,也可以是单个对象,或者任何其他类型的对象。决定响应是什么的是查询处理器,它知道需要生成哪些信息。

最后,我们得到类似这样的结果:

查询目录组织



<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface Query
{
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface QueryBus
{
    public function ask(Query $query) : Response|null;
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface QueryHandler
{
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface Response
{
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Bus\Query;

use ...

final class InMemoryQueryBus implements QueryBus
{
    private MessageBus $bus;

    public function __construct(iterable $queryHandlers)
    {
        $this->bus = new MessageBus([
            new HandleMessageMiddleware(
                new HandlersLocator(
                    HandlerBuilder::fromCallables($queryHandlers),
                ),
            ),
        ]);
    }

    public function ask(Query $query): Response|null
    {
        try {
            /** @var HandledStamp $stamp */
            $stamp = $this->bus->dispatch($query)->last(HandledStamp::class);

            return $stamp->getResult();
        } catch (NoHandlerForMessageException $e) {
            throw new InvalidArgumentException(sprintf('The query has not a valid handler: %s', $query::class));
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

我们可以使用相同的方法来注册查询处理程序和查询,使用构建处理程序的工具,并且知道我们有一个约定,其中 __invoke 函数只需要一个参数,该参数应该是 Query 接口的实现者。

要获取查询处理程序的返回值,我们需要使用已处理戳记,该戳记会将消息标记为已处理,并使我们能够访问返回值,此时我们知道该返回值应该是响应实现者。

在 config/service.yaml 中,我们可以给查询处理程序的任何实例打上标签internal.query_handler,并让服务容器将所有打上标签的实例注入到内存查询总线中。



services:
    _defaults:
        autowire: true 
        autoconfigure: true

    _instanceof:
        ...

        App\Shared\Domain\Bus\Query\QueryHandler:
            tags: ['internal.query_handler']
        ...
    ### Buses
    ...

    App\Shared\Domain\Bus\Query\QueryBus:
        class: App\Shared\Infrastructure\Bus\Query\InMemoryQueryBus
        arguments: [ !tagged internal.query_handler ]


Enter fullscreen mode Exit fullscreen mode

一切准备就绪后,我们就可以开始创建查询了,例如:

查询实现



<?php

declare(strict_types=1);

namespace App\EmailSender\Application\FindEmail;

use ...

final class FindEmailQuery implements Query
{
    public function __construct(private readonly int $id)
    {
    }

    public function id() : int
    {
        return $this->id;
    }
}


Enter fullscreen mode Exit fullscreen mode

我们将向“查找电子邮件”函数(查询处理程序的一个实现者)发送一个包含待查找电子邮件 ID 的简单查询。该函数拥有足够的电子邮件信息来查找邮件,并能根据所需信息构建响应。



<?php

declare(strict_types=1);

namespace App\EmailSender\Application\FindEmail;

use ...

final class FindEmail implements QueryHandler
{
    public function __construct(private EmailRepository $repository)
    {
    }

    public function __invoke(FindEmailQuery $query) : FindEmailResponse
    {
        $email = $this->repository->findById(
            EmailId::fromInt(
                $query->id(),
            ),
        );

        if ($email === null) {
            throw new InvalidArgumentException('Email unreachable');
        }

        return new FindEmailResponse(
            email: $email,
        );
    }
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\EmailSender\Application\FindEmail;

use ...

final class FindEmailResponse implements Response
{
    public function __construct(private readonly EmailDto $email)
    {
    }

    public function email() : EmailDto
    {
        return $this->email;
    }
}


Enter fullscreen mode Exit fullscreen mode

最后,我们可以在任何 Action 类中使用查询总线。



<?php

declare(strict_types=1);

namespace App\EmailSender\Infrastructure\Http;

use ...

class GetEmailAction
{
    public function __construct(
        private GetEmailResponder $responder,
        private QueryBus $queryBus,
    ) {
    }

    public function __invoke(Request $request, int $id) : Response
    {
        try {
            /** @var FindEmailResponse $findEmailResponse */
            $findEmailResponse = $this->queryBus->ask(
                new FindEmailQuery(id: $id)
            );

            $email = $findEmailResponse->email();

            $this->responder->loadEmail($email);
        } catch (Exception $e) {
            $this->responder->loadError($e->getMessage());
        }

        return $this->responder->response();
    }
}


Enter fullscreen mode Exit fullscreen mode

再次强调,行动知道自己在寻找什么,但它并不知道获取目标的完整过程。

结论

我们可以利用 Symfony 的组件轻松实现 CQRS 模式,创建自定义消息总线并定义一个可在整个应用程序中复用的模型。CQRS 可以帮助我们将操作和搜索关注点分离到描述性的命令/查询类中,从而构建隔离性更好的进程,并使类更易于修改。

请查看代码。

GitHub 标志 AdGARAY / cqrs-symfony

Symfony Messenger 的 CQRS 示例

CQRS 与 Symfony Messenger

要求

  • Docker Compose

设置

初始化容器

$ docker compose up -d
Enter fullscreen mode Exit fullscreen mode

进入 PHP 容器

$ docker compose exec -it php bash
Enter fullscreen mode Exit fullscreen mode

安装 Composer 依赖项

/var/www/html# $ composer install
Enter fullscreen mode Exit fullscreen mode

运行迁移

/var/www/html# $ php bin/console doctrine:migrations:migrate --no-interaction
Enter fullscreen mode Exit fullscreen mode

访问localhost:8080

如果您想逐步操作,php 镜像中已经配置了 xDebug 监听9003服务器名称对应的端口。serverName=application

请访问dev.to/adgaray查看完整帖子西班牙语版本。




文章来源:https://dev.to/adgaray/cqrs-with-symfony-messenger-2h3g