CQRS 与 Symfony Messenger
介绍
使用 Symfony Messenger 实现 CQRS
结论
查看西班牙语版本
介绍
我们通常使用相同的数据结构来写入和查询系统中的信息。对于大型系统而言,这会导致数据结构更加庞大,因为它需要将读取和写入操作集成到同一个模型中。例如,在写入信息时,我们可能需要进行大量的验证以确保持久化信息的正确性;而查询这些信息时,为了检索经过筛选的数据或针对不同情况使用不同的数据结构,查询过程可能既复杂又繁琐。
CQRS 是一种将数据存储的读取操作和更新操作分离的模式。在应用程序中实施 CQRS 可以最大限度地提高其性能、可扩展性和安全性。迁移到 CQRS 所带来的灵活性使系统能够更好地随时间演进,并防止更新命令在域级别引发合并冲突。
模式
CQRS 将读取结构与写入模型分离,读取结构使用查询来读取数据,而写入模型使用命令来对数据进行操作。
命令应该基于任务,这意味着我们需要关注命令的操作。例如,在配送应用中,当用户下单时,我们会将操作命名为 OrderProductCommand,而不是 AddProductToClient 或 CreateNewOrderProduct。这样做也能使我们的应用层更加一致。
查询操作不会修改数据库。查询返回的 DTO 不包含任何领域知识。我们需要关注的是所需信息,而不是领域行为。
好处
独立扩展 。它允许读取模型和写入模型独立扩展。
优化的数据模式 。读取模型可以使用针对查询优化的模式,而写入模型可以使用针对更新优化的模式。
安全性 。更容易确保只有正确的域实体才能执行写入操作。
关注点分离 。复杂的业务逻辑放在写入模型中。读取模型可以很简单。
使用 Symfony Messenger 实现 CQRS
消息传递组件 帮助应用程序与其他应用程序或通过消息队列收发消息。它还允许我们定义自定义消息总线,包括消息类型和处理程序。
我们来谈谈软件架构。
指令总线
谈到命令,我们需要构建一个通用接口,以便消息总线能够管理并将其传输给各个处理程序。命令接口最终将成为每个命令的基础接口。每个命令处理程序的实现者都会对给定的命令执行操作,但命令本身并不知道执行的是哪个操作。此外,我们还将为命令总线创建一个接口,以便为我们的消息(命令)创建不同类型的传输器。在本例中,我们创建了一个内存命令总线,但如果需要,我们可以轻松扩展这个概念(例如:队列工作)。
我们的 Symfony 应用程序的源代码中将会出现类似这样的内容。
<?php
declare ( strict_types = 1 );
namespace App\Shared\Domain\Bus\Command ;
interface Command
{
}
Enter fullscreen mode
Exit fullscreen mode
<?php
declare ( strict_types = 1 );
namespace App\Shared\Domain\Bus\Command ;
interface CommandBus
{
public function dispatch ( Command $command ) : void ;
}
Enter fullscreen mode
Exit fullscreen mode
<?php
declare ( strict_types = 1 );
namespace App\Shared\Domain\Bus\Command ;
interface CommandHandler
{
}
Enter fullscreen mode
Exit fullscreen mode
<?php
declare ( strict_types = 1 );
namespace App\Shared\Infrastructure\Bus\Command ;
use ...
final class InMemoryCommandBus implements CommandBus
{
private MessageBus $bus ;
public function __construct (
iterable $commandHandlers
) {
$this -> bus = new MessageBus ([
new HandleMessageMiddleware (
new HandlersLocator (
HandlerBuilder :: fromCallables ( $commandHandlers ),
),
),
]);
}
/**
* @throws Throwable
*/
public function dispatch ( Command $command ): void
{
try {
$this -> bus -> dispatch ( $command );
} catch ( NoHandlerForMessageException $e ) {
throw new InvalidArgumentException ( sprintf ( 'The command has not a valid handler: %s' , $command :: class ));
} catch ( HandlerFailedException $e ) {
throw $e -> getPrevious ();
}
}
}
Enter fullscreen mode
Exit fullscreen mode
对于内存命令总线类,我们需要注册来自服务定义的每个命令处理器。我们将利用 Symfony 的一项名为 “服务标签 ”的功能,该功能由服务容器和 自动配置 提供。它允许我们标记服务,以便稍后调用。在 `config/services.yaml` 文件中,我们指示服务容器使用标签标记每个命令处理器接口的实例。 internal.command_handler之后,我们声明内存命令总线,并将所有命令处理器的实现者作为可迭代参数传递。命令总线将接收每个命令处理器,并声明预期命令及其对应的处理器。
parameters :
services :
_defaults :
autowire : true
autoconfigure : true
_instanceof :
App\Shared\Domain\Bus\Command\CommandHandler :
tags : [ ' internal.command_handler' ]
...
### Buses
App\Shared\Domain\Bus\Command\CommandBus :
class : App\Shared\Infrastructure\Bus\Command\InMemoryCommandBus
arguments : [ !tagged internal.command_handler ]
Enter fullscreen mode
Exit fullscreen mode
我们可以创建一个处理程序构建器工具,负责在命令处理程序实现者的 `__invoke` 方法中查找,并将第一个参数类型作为调用该处理程序所需的命令。此时,我们制定了一个约定:每个命令处理程序都必须是可调用的,并且只有一个参数,其类型为命令本身。
<?php
declare ( strict_types = 1 );
namespace App\Shared\Infrastructure\Bus ;
use ...
final class HandlerBuilder
{
/**
* @throws ReflectionException
*/
public static function fromCallables ( iterable $callables ) : array
{
$callablesHandlers = [];
foreach ( $callables as $callable ) {
$envelop = self :: extractFirstParam ( $callable );
if ( ! array_key_exists ( $envelop , $callablesHandlers )) {
$callablesHandlers [ self :: extractFirstParam ( $callable )] = [];
}
$callablesHandlers [ self :: extractFirstParam ( $callable )][] = $callable ;
}
return $callablesHandlers ;
}
/**
* @throws ReflectionException
*/
private static function extractFirstParam ( object | string $class ) : string | null
{
$reflection = new ReflectionClass ( $class );
$method = $reflection -> getMethod ( '__invoke' );
if ( $method -> getNumberOfParameters () === 1 ) {
return $method -> getParameters ()[ 0 ] -> getClass () ?-> getName ();
}
return null ;
}
}
Enter fullscreen mode
Exit fullscreen mode
有了这些信息,我们就可以开始构建命令了,例如:
<?php
declare ( strict_types = 1 );
namespace App\EmailSender\Application\Create ;
use ...
final class CreateEmailCommand implements Command
{
public function __construct (
private readonly string $sender ,
private readonly string $addressee ,
private readonly string $message ,
) {
}
public function sender (): string
{
return $this -> sender ;
}
public function addressee (): string
{
return $this -> addressee ;
}
public function message (): string
{
return $this -> message ;
}
}
Enter fullscreen mode
Exit fullscreen mode
我们可以创建一个创建电子邮件命令,它包含创建新电子邮件所需的信息,但它不知道创建电子邮件所需的具体流程。
<?php
declare ( strict_types = 1 );
namespace App\EmailSender\Application\Create ;
use ...
class CreateEmailCommandHandler implements CommandHandler
{
public function __construct ( private EmailRepository $repository )
{
}
public function __invoke ( CreateEmailCommand $command ) : EmailId {
$email = Email :: createNewEmail (
sender : new EmailAddress ( $command -> sender ()),
addressee : new EmailAddress ( $command -> addressee ()),
message : new Message ( $command -> message ()),
);
$this -> repository -> save ( $email );
return $email -> id ();
}
}
Enter fullscreen mode
Exit fullscreen mode
我们为之前的命令创建了一个处理程序,它知道我们将使用类型为“创建电子邮件命令”的唯一参数调用该对象,并且它知道创建新电子邮件所需的所有过程。
<?php
declare ( strict_types = 1 );
namespace App\EmailSender\Infrastructure\Http ;
use ...
class CreateEmailAction
{
public function __construct (
private readonly CreateEmailResponder $responder ,
private readonly CommandBus $commandBus ,
) {
}
public function __invoke ( Request $request ) : Response
{
try {
$this -> commandBus -> dispatch (
new CreateEmailCommand (
sender : $request -> request -> get ( 'sender' ),
addressee : $request -> request -> get ( 'addressee' ),
message : $request -> request -> get ( 'message' ),
),
);
} catch ( Exception $e ) {
$this -> responder -> loadError ( $e -> getMessage ());
}
return $this -> responder -> response ();
}
}
Enter fullscreen mode
Exit fullscreen mode
因此,我们可以轻松地将命令总线注入到 Action(控制器)类中并分发命令。Action 类本身并不知道应用程序核心发生了什么,但命令总线可以确保我们将命令发送到合适的 Handler,并由 Handler 执行相应的操作。请注意,我们知道将要执行的操作,它由命令名称提供。
查询总线
我们来看一下查询总线的模型。我们可以定义一个非常类似的架构,但现在我们需要在查询请求某些内容时返回一个值,这就需要引入响应的概念。响应可以是领域对象的集合,也可以是单个对象,或者任何其他类型的对象。决定响应是什么的是查询处理器,它知道需要生成哪些信息。
最后,我们得到类似这样的结果:
<?php
declare ( strict_types = 1 );
namespace App\Shared\Domain\Bus\Query ;
interface Query
{
}
Enter fullscreen mode
Exit fullscreen mode
<?php
declare ( strict_types = 1 );
namespace App\Shared\Domain\Bus\Query ;
interface QueryBus
{
public function ask ( Query $query ) : Response | null ;
}
Enter fullscreen mode
Exit fullscreen mode
<?php
declare ( strict_types = 1 );
namespace App\Shared\Domain\Bus\Query ;
interface QueryHandler
{
}
Enter fullscreen mode
Exit fullscreen mode
<?php
declare ( strict_types = 1 );
namespace App\Shared\Domain\Bus\Query ;
interface Response
{
}
Enter fullscreen mode
Exit fullscreen mode
<?php
declare ( strict_types = 1 );
namespace App\Shared\Infrastructure\Bus\Query ;
use ...
final class InMemoryQueryBus implements QueryBus
{
private MessageBus $bus ;
public function __construct ( iterable $queryHandlers )
{
$this -> bus = new MessageBus ([
new HandleMessageMiddleware (
new HandlersLocator (
HandlerBuilder :: fromCallables ( $queryHandlers ),
),
),
]);
}
public function ask ( Query $query ): Response | null
{
try {
/** @var HandledStamp $stamp */
$stamp = $this -> bus -> dispatch ( $query ) -> last ( HandledStamp :: class );
return $stamp -> getResult ();
} catch ( NoHandlerForMessageException $e ) {
throw new InvalidArgumentException ( sprintf ( 'The query has not a valid handler: %s' , $query :: class ));
}
}
}
Enter fullscreen mode
Exit fullscreen mode
我们可以使用相同的方法来注册查询处理程序和查询,使用构建处理程序的工具,并且知道我们有一个约定,其中 __invoke 函数只需要一个参数,该参数应该是 Query 接口的实现者。
要获取查询处理程序的返回值,我们需要使用 已处理戳记 ,该戳记会将消息标记为已处理,并使我们能够访问返回值,此时我们知道该返回值应该是响应实现者。
在 config/service.yaml 中,我们可以给查询处理程序的任何实例打上标签 internal.query_handler,并让服务容器将所有打上标签的实例注入到内存查询总线中。
services :
_defaults :
autowire : true
autoconfigure : true
_instanceof :
...
App\Shared\Domain\Bus\Query\QueryHandler :
tags : [ 'internal.query_handler' ]
...
### Buses
...
App\Shared\Domain\Bus\Query\QueryBus :
class : App\Shared\Infrastructure\Bus\Query\InMemoryQueryBus
arguments : [ ! tagged internal . query_handler ]
Enter fullscreen mode
Exit fullscreen mode
一切准备就绪后,我们就可以开始创建查询了,例如:
<?php
declare ( strict_types = 1 );
namespace App\EmailSender\Application\FindEmail ;
use ...
final class FindEmailQuery implements Query
{
public function __construct ( private readonly int $id )
{
}
public function id () : int
{
return $this -> id ;
}
}
Enter fullscreen mode
Exit fullscreen mode
我们将向“查找电子邮件”函数(查询处理程序的一个实现者)发送一个包含待查找电子邮件 ID 的简单查询。该函数拥有足够的电子邮件信息来查找邮件,并能根据所需信息构建响应。
<?php
declare ( strict_types = 1 );
namespace App\EmailSender\Application\FindEmail ;
use ...
final class FindEmail implements QueryHandler
{
public function __construct ( private EmailRepository $repository )
{
}
public function __invoke ( FindEmailQuery $query ) : FindEmailResponse
{
$email = $this -> repository -> findById (
EmailId :: fromInt (
$query -> id (),
),
);
if ( $email === null ) {
throw new InvalidArgumentException ( 'Email unreachable' );
}
return new FindEmailResponse (
email : $email ,
);
}
}
Enter fullscreen mode
Exit fullscreen mode
<?php
declare ( strict_types = 1 );
namespace App\EmailSender\Application\FindEmail ;
use ...
final class FindEmailResponse implements Response
{
public function __construct ( private readonly EmailDto $email )
{
}
public function email () : EmailDto
{
return $this -> email ;
}
}
Enter fullscreen mode
Exit fullscreen mode
最后,我们可以在任何 Action 类中使用查询总线。
<?php
declare ( strict_types = 1 );
namespace App\EmailSender\Infrastructure\Http ;
use ...
class GetEmailAction
{
public function __construct (
private GetEmailResponder $responder ,
private QueryBus $queryBus ,
) {
}
public function __invoke ( Request $request , int $id ) : Response
{
try {
/** @var FindEmailResponse $findEmailResponse */
$findEmailResponse = $this -> queryBus -> ask (
new FindEmailQuery ( id : $id )
);
$email = $findEmailResponse -> email ();
$this -> responder -> loadEmail ( $email );
} catch ( Exception $e ) {
$this -> responder -> loadError ( $e -> getMessage ());
}
return $this -> responder -> response ();
}
}
Enter fullscreen mode
Exit fullscreen mode
再次强调,行动知道自己在寻找什么,但它并不知道获取目标的完整过程。
结论
我们可以利用 Symfony 的组件轻松实现 CQRS 模式,创建自定义消息总线并定义一个可在整个应用程序中复用的模型。CQRS 可以帮助我们将操作和搜索关注点分离到描述性的命令/查询类中,从而构建隔离性更好的进程,并使类更易于修改。
请查看代码。
Symfony Messenger 的 CQRS 示例
CQRS 与 Symfony Messenger
要求
设置
初始化容器
$ docker compose up -d
Enter fullscreen mode
Exit fullscreen mode
进入 PHP 容器
$ docker compose exec -it php bash
Enter fullscreen mode
Exit fullscreen mode
安装 Composer 依赖项
/var/www/html# $ composer install
Enter fullscreen mode
Exit fullscreen mode
运行迁移
/var/www/html# $ php bin/console doctrine:migrations:migrate --no-interaction
Enter fullscreen mode
Exit fullscreen mode
访问 localhost:8080
如果您想逐步操作, php 镜像中已经配置了 xDebug 监听 9003服务器名称对应的端口。 serverName=application
请访问dev.to/adgaray 查看完整帖子 及 西班牙语版本。
文章来源:https://dev.to/adgaray/cqrs-with-symfony-messenger-2h3g