发布于 2026-01-06 0 阅读
0

第三部分:将 NestJS 和 NATS 集成到应用程序的技术

第三部分:将 NestJS 和 NATS 集成到应用程序的技术

John 是 NestJS 核心团队的成员。

第三部分:解决消息不兼容问题

在本系列的第二部分中,我们描述了使用 NATS 作为中介将 Nest 应用与非 Nest 应用集成所面临的挑战。

本文将探讨解决该问题的方法。

获取代码

提醒:本文所有代码均可在此处获取,完整说明请点击此处

序列化和反序列化

序列化和反序列化是 Nest 用于在不同消息格式之间进行转换的步骤。换句话说,Nest 接收来自消息代理的入站消息,对其进行反序列化(将任意消息格式转换为 Nest 消息格式),然后进行处理。对于出站消息,流程则相反:在将消息发送到消息代理之前,最后一步是(可选地)对其进行序列化(将 Nest 消息格式转换为其他格式)。

Nest 的巧妙之处在于,它不仅能自动完成所有这些操作,还提供了接口,方便你轻松插入自己的序列化器和反序列化器来处理任意消息格式。序列化器和反序列化器其实就是实现了特定接口的类。

由于请求者和响应者各自扮演着独特的角色(各自拥有不同的上下文),并且既是消息发送者又是消息接收者,因此序列化/反序列化操作会在四个不同的钩子点进行。以下示例将详细介绍这些钩子点。如果您要为本系列文章中某个章节收藏以便日后参考,那么这部分内容绝对值得您收藏。每个示例都详细描述了处理序列化/反序列化任务的具体位置,并提供了完整的上下文信息,帮助您理解操作的原因和方法。

序列化/反序列化钩子和配方

让我们来考察两种不同的上下文(请求者和响应者),我们需要在这两种上下文中进行序列化/反序列化。对于每种上下文,我们将分别探讨如何处理入站和出站消息。

Nest 作为请求者

Nest 请求者存在于实例的上下文中ClientProxy。因此,我们可以通过修改ClientProxy行为来定制序列化/反序列化行为。例如:



// app.controller.ts
@Client({
  transport: Transport.NATS,
  options: {
    url: 'nats://localhost:4222',
    serializer: new OutboundRequestSerializer(),
    deserializer: new InboundResponseDeserializer(),
  }
})
client: ClientProxy;


Enter fullscreen mode Exit fullscreen mode

在上面的代码片段中, ` OutboundRequestSerializerNestRequest` 类回答了“我的 Nest 请求器如何格式化传出的请求,以便外部应用程序可以处理它们?”这个问题,而InboundResponseDeserializer`NestRequest` 类则回答了“我的请求器如何转换传入的外部响应,以便 Nest 可以处理它们?”这个问题

注意:你可以为这些类选择任何名称,但相信我,强烈建议你选择与上面类似的命名规则,以便于区分!

Nest 作为响应者

Nest 响应器运行在微服务实例的上下文中。因此,我们通过修改微服务实例的行为来定制序列化/反序列化行为。例如:



// main.ts
async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.NATS,
    options: {
      url: 'nats://localhost:4222',
      deserializer: new InboundMessageDeserializer(),
      serializer: new OutboundResponseSerializer(),
    },
  });
  app.listen(() => console.log('Microservice is listening...'));
}


Enter fullscreen mode Exit fullscreen mode

所以InboundMessageDeserializer,该类回答了“我的响应器如何转换传入的外部消息,以便 Nest 可以处理它们?”这个问题,而OutboundResponseSerializer该类则处理了“我的响应器如何格式化响应,以便外部应用程序可以理解它们?”这个问题

此功能可生成以下图表——这些图表消除了上一个图表中所有难看的红色 X

以下是在为 Nest 响应器编写序列化器/反序列化器时需要记住的图表

Nest Responder(反)序列化

图 1:Nest Responder(反)序列化

在上图中,Nest 响应程序的序列化器和反序列化器在NestFactory.creatMicroservice()调用中进行了配置。

这是Nest 请求者的相应图表。

Nest 请求者(反)序列化

图 2:嵌套请求者(反)序列化

在上图中,Nest 请求器序列化器和反序列化器在ClientProxy配置中进行配置(装饰器或可注入器,取决于您使用的方法)。

实现序列化器和反序列化器

在开始编写序列化器/反序列化器类之前,让我们先来看一下它们的接口。

序列化器接口

序列化器类需要实现该serialize()方法,该方法接受一个参数(要序列化的出站有效负载),并返回序列化的有效负载。



export interface Serializer<TInput = any, TOutput = any> {
  serialize(value: TInput): TOutput;
}


Enter fullscreen mode Exit fullscreen mode

接下来,我们将为 Nest 响应器实现一个身份序列化器作为练习。身份序列化器会简单地返回它接收到的相同消息。除了练习这个接口之外,我们还可以用它来“监视”Nest 始终代表我们执行的、原本静默的序列化函数。

响应者身份序列化器实现

在这里,我们将在nestMicroservice项目(即 nest-nats-sample/nestMicroservice 目录)中工作。请查看该src/common/serializers/outbound-response-identity.serializer.ts文件:



// src/common/serializers/outbount-response-identity.serializer.ts
import { Serializer, OutgoingResponse } from '@nestjs/microservices';
import { Logger } from '@nestjs/common';

export class OutboundResponseIdentitySerializer implements Serializer {
  private readonly logger = new Logger('OutboundResponseIdentitySerializer');
  serialize(value: any): OutgoingResponse {
    this.logger.debug(
      `-->> Serializing outbound response: \n${JSON.stringify(value)}`
    );
    return value;
  }
}


Enter fullscreen mode Exit fullscreen mode

现在只需将此序列化器插入到main.tsNest 响应器应用程序的文件中即可。如果您正在按照 GitHub 代码库进行操作,则该main.ts文件中已经包含了所有这些代码(以及我们稍后会介绍的更多代码),但都被注释掉了。只需取消注释下面所示的行即可。



// src/main.ts
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.NATS,
  options: {
    queue: 'customers',
    url: 'nats://localhost:4222',
    /**
      * Use the "Identity" (de)serializers for observing messages for
      * nest-only deployment.
      */
    serializer: new OutboundResponseIdentitySerializer(),
  },
});


Enter fullscreen mode Exit fullscreen mode

如果您现在从nestHttpApp向nestMicroservice应用发送一些请求方法如下),您将看到类似这样的日志信息,其中显示了出站消息的内部布局:



[Nest] 8786   - 02/04/2020, 10:19:04 AM   [OutboundResponseIdentitySerializer] -->> Serializing outbound response:
{"err":null,"response":[],"isDisposed":true,"id":"41da72e8-09d0-4720-9404-bd0977b034a0"}


Enter fullscreen mode Exit fullscreen mode
解串口接口

现在我们来看一下反序列化器所需的接口。



export interface Deserializer<TInput = any, TOutput = any> {
  deserialize(value: TInput, options?: Record<string, any>): TOutput;
}


Enter fullscreen mode Exit fullscreen mode

反序列化器类需要实现该deserialize()方法,该方法接受两个参数——要反序列化的有效负载和一个可选options对象——并返回反序列化的有效负载。

options对象包含有关传入消息的元数据。对于 NATS,该对象包含消息属性的值replyTo(如果存在)。

让我们来实现一个反序列化器。首先,我们将构建一个“身份反序列化器”,它只会记录任何传入消息的内容,而不对其进行任何转换。

响应者身份反序列化器实现

我们仍在开发 Nest 响应器——nestMicroservice项目(即 nest-nats-sample/nestMicroservice 目录)。请查看该src/common/deserializers/inbound-message-identity.deserializer.ts文件:



// src/common/serializers/inbound-message-identity.deserializer.ts
import { ConsumerDeserializer, IncomingRequest } from '@nestjs/microservices';
import { Logger } from '@nestjs/common';

export class InboundMessageIdentityDeserializer
  implements ConsumerDeserializer {
  private readonly logger = new Logger('InboundMessageIdentityDeserializer');

  deserialize(value: any, options?: Record<string, any>): IncomingRequest {
    this.logger.verbose(
      `<<-- deserializing inbound message:\n${JSON.stringify(
        value
      )}\n\twith options: ${JSON.stringify(options)}`
    );
    return value;
  }
}


Enter fullscreen mode Exit fullscreen mode

就像我们的身份序列化器一样,我们可以快速插入一个身份反序列化器来监视来自请求者的消息。同样,如果您正在使用 GitHub 代码库进行操作,只需注释掉相应的行,使激活的(反)序列化器与下面显示的匹配即可。



// src/main.ts
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.NATS,
  options: {
    queue: 'customers',
    url: 'nats://localhost:4222',
    /**
      * Use the "Identity" (de)serializers for observing messages for
      * nest-only deployment.
      */
    serializer: new OutboundResponseIdentitySerializer(),
    deserializer: new InboundMessageIdentityDeserializer(),
  },
});


Enter fullscreen mode Exit fullscreen mode

如果您再次从nestHttpApp向nestMicroservice应用发送一些请求像这样),您将看到类似这样的日志信息,其中显示了入站消息的布局:



[Nest] 8786   - 02/04/2020, 10:19:04 AM   [InboundMessageIdentityDeserializer] <<-- deserializing inbound message:
{"pattern":"get-customers","data":{},"id":"41da72e8-09d0-4720-9404-bd0977b034a0"}
        with options: {"channel":"get-customers","replyTo":"_INBOX.IDML09N4JB6W0MF4P6GBPB.IDML09N4JB6W0MF4P6GCX2"}


Enter fullscreen mode Exit fullscreen mode

请注意日志输出中该字段的值replyTo。您可能已经猜到它是什么了。这个细节以后会派上用场,所以请记下来以备将来参考。

实施集成用例

我们即将到达最终目的地。凭借我们所获得的理解,我们可以具体说明完成第一部分图 1 中的集成用例需要哪些内容。

以下是具体要求:

  1. 作为请求者的nestHttpApp必须实现:

    • 一个)一个用于将 Nest 格式的请求转换为我们的外部服务可以理解的出站消息外部序列化器。例如:
      • 来自 Nest 格式
        • 话题:'get-customers'
        • 回复主题:'_INBOX.XVRL...'
        • 有效载荷:{pattern: 'get-customers', data: {}, id: 'abc...'}
      • 转换为外部格式
        • 话题:'get-customers'
        • 回复主题:'_INBOX.XVRL...'
        • 有效载荷:{}
    • B)一个外部响应反序列化器,用于将外部响应转换为 Nest 可以理解的格式。例如:
      • 来自外部格式
        • 话题:'_INBOX.XVRL...'
        • 有效载荷:{customers: [{id: 1, name: 'nestjs.com'}]}
      • 嵌套格式
        • 话题:'_INBOX.XVRL...'
        • 有效载荷:{err: undefined, response: {customers: [{id: 1, name: 'nestjs.com'}]}, isDisposed: true}
  2. 作为响应方的nestMicroservice应用必须实现:

    • 一个)一个入站消息外部反序列化器,用于将外部请求转换为 Nest 可以理解的格式。例如:
      • 来自外部格式
        • 话题:'get-customers'
        • 回复:'_INBOX.XVRL...'
        • 有效载荷:{}
      • 嵌套格式
        • 话题'get-customers'
        • 回复主题:'_INBOX.XVRL...'
        • 有效载荷:{pattern: 'get-customers', data: {}, id: 'abc...'}
    • B)一个用于将 Nest 格式的响应转换为我们的外部服务可以理解的响应的外部序列化程序。例如:
      • 来自 Nest 格式
        • 话题:'_INBOX.XVRL...'
        • 有效载荷:{err: undefined, response: {customers: [{id: 1, name: 'nestjs.com'}]}, isDisposed: true}
      • 转换为外部格式
        • 话题:'_INBOX.XVRL...'
        • 有效载荷:{customers: [{id: 1, name: 'nestjs.com'}]}

鉴于此,让我们开始行动吧!以上加粗的描述是我们课程的名称。

对于需求 1-A,我们src/common/serializers/outbound-message-external.serializer.tsnestHttpApp项目中已经实现了。以下是代码。注释解释了其意图。



// src/common/serializers/outbound-message-external.serializer.ts
import { Logger } from '@nestjs/common';
import { Serializer } from '@nestjs/microservices';

export class OutboundMessageExternalSerializer implements Serializer {
  private readonly logger = new Logger('OutboundMessageExternalSerializer');
  serialize(value: any) {
    this.logger.debug(
      `-->> Serializing outbound message: \n${JSON.stringify(value)}`,
    );

    /**
     * Here, we are merely "unpacking" the request payload from the Nest
     * message structure and returning it as a "plain" top-level object.
     */
    return value.data;
  }
}


Enter fullscreen mode Exit fullscreen mode

对于需求 1-B,我们src/common/deserializers/inbound-response-external.deserializer.tsnestHttpApp项目中实现了。以下是代码。注释解释了其意图。



// src/common/deserializers/inbound-response-external.deserializer.ts
import { WritePacket, Deserializer } from '@nestjs/microservices';
import { Logger } from '@nestjs/common';

export class InboundResponseExternalDeserializer implements Deserializer {
  private readonly logger = new Logger('InboundResponseExternalDeserializer');

  deserialize(value: any): WritePacket {
    this.logger.verbose(
      `<<-- deserializing inbound response:\n${JSON.stringify(value)}`,
    );

    /**
     * Here, we wrap the external payload received in a standard Nest
     * response message.  Note that we have omitted the `id` field, as it
     * does not have any meaning from an external responder.  Because of this,
     * we have to also:
     *   1) implement the `Deserializer` interface instead of the
     *      `ProducerDeserializer` interface used in the identity deserializer
     *   2) return an object with the `WritePacket` interface, rather than
     *      the`IncomingResponse` interface used in the identity deserializer.
     */
    return {
      err: undefined,
      response: value,
      isDisposed: true,
    };
  }
}


Enter fullscreen mode Exit fullscreen mode

针对需求 2-A,我们src/common/deserializers/inbound-message-external.deserializer.tsnestMicroservice项目中实现了相关功能。以下是代码,注释解释了其意图。



// src/common/serializers/inbound-message-external.deserializer.ts
import { Logger } from '@nestjs/common';
import * as uuid from 'uuid/v4';
import { ConsumerDeserializer } from '@nestjs/microservices';

export class InboundMessageExternalDeserializer
  implements ConsumerDeserializer {
  private readonly logger = new Logger('InboundMessageExternalDeserializer');
  deserialize(value: any, options?: Record<string, any>) {
    this.logger.verbose(
      `<<-- deserializing inbound external message:\n${JSON.stringify(
        value,
      )}\n\twith options: ${JSON.stringify(options)}`,
    );

    /**
     * Here, we merely wrap our inbound message payload in the standard Nest
     * message structure.
     */
    return {
      pattern: undefined,
      data: value,
      id: uuid(),
    };
  }
}


Enter fullscreen mode Exit fullscreen mode

针对需求 2-B,我们src/common/serializers/outbound-response-external.serializer.tsnestMicroservice项目中实现了相关功能。以下是代码,注释解释了其意图。



// src/common/serializers/outbound-response-external.serializer.ts
import { Serializer, OutgoingResponse } from '@nestjs/microservices';
import { Logger } from '@nestjs/common';

export class OutboundResponseExternalSerializer implements Serializer {
  private readonly logger = new Logger('OutboundResponseExternalSerializer');
  serialize(value: any): OutgoingResponse {
    this.logger.debug(
      `-->> Serializing outbound response: \n${JSON.stringify(value)}`,
    );

    /**
     * Here, we are merely "unpacking" the response payload from the Nest
     * message structure, and returning it as a "plain" top-level object.
     */

    return value.response;
  }
}


Enter fullscreen mode Exit fullscreen mode

启用外部(反)序列化器

最后一步是将这些序列化器(反序列化器)插入到相应的“钩子点”。我们之前已经见过这种情况。对于 `<string>` 标签nestHttpApp,无论在哪里配置 `<string>` 标签,都需要进行此操作ClientProxy。在本例中,为了便于代码组织,我们使用 `<string>`标签@Client()中的装饰器来完成此操作src/app.controller.ts。要启用外部序列化器(反序列化器),请将该文件更新为如下所示:



// src/app.controller.ts
  @Client({
    transport: Transport.NATS,
    options: {
      url: 'nats://localhost:4222',
      /**
       * Use the "Identity" (de)serializers for observing messages for
       * nest-only deployment.
       */
      // serializer: new OutboundMessageIdentitySerializer(),
      // deserializer: new InboundResponseIdentityDeserializer(),

      /**
       * Use the "External" (de)serializers for transforming messages to/from
       * (only) an external responder
       */
      serializer: new OutboundMessageExternalSerializer(),
      deserializer: new InboundResponseExternalDeserializer(),
    },
  })
  custClient: ClientProxy;


Enter fullscreen mode Exit fullscreen mode

对于外部序列化器nestMicroservice,此操作在文件中完成src/main.ts。要启用外部序列化器,请将该文件更新为如下所示:



// src/main.ts
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.NATS,
    options: {
      queue: 'customers',
      url: 'nats://localhost:4222',
      /**
       * Use the "Identity" (de)serializers for observing messages for
       * nest-only deployment.
       */
      // serializer: new OutboundResponseIdentitySerializer(),
      // deserializer: new InboundMessageIdentityDeserializer(),

      /**
       * Use the "External" (de)serializers for transforming messages to/from
       * an external requestor
       */
      serializer: new OutboundResponseExternalSerializer(),
      deserializer: new InboundMessageExternalDeserializer(),
    },
  });


Enter fullscreen mode Exit fullscreen mode

此时,您应该能够在任意应用组合之间发送和接收请求。例如,从nestHttpAppcustomerService,从customerAppnestMicroservice ,以及所有其他组合。我强烈建议您立即执行此操作,并按照此处此处的说明同时运行所有组件*。请密切关注日志输出,其中显示了每一步的序列化/反序列化过程。

*我们尚未详细介绍其用法queues,但当您开始运行完整的混合配置(包含 Nest 组件和非 Nest 组件)时,您会注意到请求会在nestMicroservice应用和customerService应用之间随机路由。这是因为它们都属于NATS 队列。如果您想将请求发送到特定的响应器,则需要关闭另一个响应器,以确保消息能够发送到目标位置。我将在本系列的最后一篇文章中更详细地讨论 NATS 队列。

惊喜来了!我们已经巧妙地处理了图 1 中的情况 D。仔细想想,这其实并不意外,而且它本身也存在一些局限性。之所以可行,是因为我们现在使用规范的外部格式进行序列化和反序列化。因此,所有请求看起来都像是来自外部请求方,所有响应看起来都像是来自外部响应方。有了这一点,我们的序列化器和反序列化器就能应对所有组合。我们创建了一种“通用”消息协议。这种方法的局限性或许比较隐蔽,我们将在后面讨论(参见下文“下一步是什么? ”中的第 2 点)。

结论🚀

我们已经涵盖了很多内容。希望您现在既掌握了如何使用 Nest 微服务和 NATS 消息代理等工具将外部应用程序与 Nest 应用集成的概念框架,也掌握了一些可以实际操作的样板代码。其中一些概念也可以直接应用于其他 Nest 微服务传输器,但正如您所想,细节有所不同。如果您希望我在另一个系列中以这种方式介绍其他传输器,请在评论区留言,我会尽力安排。

接下来是什么?❓

还有一些比较细致的话题需要探讨,我将在本系列的下一篇文章中进行介绍。先透露一下,这些话题包括:

  1. 我们能否同时运行nestMicroservice应用和外部customerService应用,并在它们之间进行负载均衡?答案是肯定的!事实上,我们已经做到了,正如上文所述。使用 NATS分布式队列可以轻松实现这一点,我们将在下一篇文章中简要介绍。
  2. 我们已经讨论了在混合环境(Nest 和非 Nest 环境)中运行修改后的nestHttpApp的情况。那么,如果我们混合使用未经修改的 Nest 应用——也就是说,这些是已部署的应用,我们不想为了实现通用消息协议而对其进行任何修改(即,它们运行的​​是开箱即​​用的标准反序列化器)——会发生什么呢?它们能在这种混合环境中良好运行吗?剧透一下:可以!
  3. 那么事件呢?我们已经讨论了请求/响应式消息传递这种看似比较棘手的情况,但普通的事件又该如何处理呢?事实上,您可能已经注意到我们构建了一个添加客户的功能(请参阅'add-customer'代码中散布的消息和处理程序)。现在就去试试吧。您会发现结果好坏参半。我们能否让事件在这种混合环境中协同工作呢?剧透一下:可以!

敬请期待下一期节目,我们将解答这些以及其他精彩问题!😃

欢迎在下方评论区提问、评论或提出建议,或者只是打个招呼。也欢迎加入我们的Discord服务器,一起愉快地讨论 NestJS。我的 Discord 用户名是Y Prospect

文章来源:https://dev.to/nestjs/integrate-nestjs-with-external-services-using-microservice-transporters-part-3-4m20