发布于 2026-01-06 0 阅读
0

Node后端服务器 - 使用Next.js、Node和RabbitMQ制作YouTube GIF动画

Node后端服务器 - 使用Next.js、Node和RabbitMQ制作YouTube GIF动画

大家好,
本文是“使用 Next.js、Node 和 RabbitMQ 制作 YouTube GIF”系列文章的第二部分。

本文将深入探讨如何构建 YouTube 转 GIF 转换器的后端服务器。本文会包含一些代码片段,但您可以在 GitHub 上访问整个项目,其中包含完整的源代码、额外的集成测试和 Swagger API 文档。您还可以查看应用演示。本文将涵盖以下主题。

功能

后端序列

如上图所示,后端服务器具有以下 3 个主要功能:

  • 通过在数据库中创建新的作业记录来处理 GIF 转换请求
  • 将事件分发到 RabbitMQ,表明已创建新的转换作业(任务排队)。
  • 通过从数据库中查询作业 ID 来处理作业获取请求,并返回相应的响应。

项目架构

我们的 Express 应用架构包含三个主要组件

  • 路由处理程序
  • 控制器
  • 服务

它们各自具有特定的功能,我们将详细讨论这些功能,并解释其结构如此设计的原因。

后端架构

  • 路由处理程序
    • 负责将流量路由到各自的路由处理程序。通常,这些路由处理程序由一个处理程序数组组成,我们称之为“中间件链”,该链中的最后一个处理程序就是路由控制器。
    • 中间件链通常负责对传入请求进行“检查”,并在某些情况下修改请求对象。在本例中,我们将使用自定义验证中间件进行验证。
  • 控制器
    • 从请求中提取数据,并在必要时对数据进行清理。
    • 将控制权委托给相关服务
    • 处理回复
    • 将错误委托给自定义错误处理中间件
  • 服务
    • 包含所有业务逻辑
    • 使用数据访问层(ORM/ODM)访问数据

控制器应该保持“愚钝”,也就是说,它们不应该了解任何业务逻辑的细节,它们只需要知道“哪个服务可以处理这个请求”、“这个服务需要哪些数据”、“响应应该是什么样的”。这样可以避免控制器臃肿。

执行

数据库架构

在这个项目中,我们使用了TypeORM,它是一个支持多种数据库的 TypeScript ORM(如本系列第一部分所述,我们将使用 MongoDB)。

我们将把每次 GIF 转换表示为一个 Job,这将是我们唯一的集合。TypeORM
中的 Job 集合如下所示。



import { BaseEntity, Entity, ObjectID, Column, CreateDateColumn, UpdateDateColumn, ObjectIdColumn } from 'typeorm';

@Entity('jobs')
export class Job extends BaseEntity {
  @ObjectIdColumn()
  id: ObjectID;

  @Column({
    nullable: false,
  })
  youtubeUrl: string;

  @Column({
    nullable: false,
  })
  youtubeId: string;

  @Column({
    nullable: true,
  })
  gifUrl: string;

  @Column({
    nullable: false,
  })
  startTime: number;

  @Column({
    nullable: false,
  })
  endTime: number;

  @Column({
    type: 'enum',
    enum: ['pending', 'processing', 'done', 'error'],
  })
  status: 'pending' | 'processing' | 'done' | 'error';

  @Column()
  @CreateDateColumn()
  createdAt: Date;

  @Column()
  @UpdateDateColumn()
  updatedAt: Date;
}


Enter fullscreen mode Exit fullscreen mode

这里需要注意的是状态字段,它本质上是一个枚举,用于指示 GIF 转换的当前状态。所有其他字段都是执行转换工作所需的标准数据。

路线处理

如前所述,我们只有两条路线。

  • 创建新的 GIF 转换任务的路径
  • 用于根据转换任务的 ID 获取该任务数据的路由,客户端稍后将使用该 ID 进行轮询。

这就是我们的路由处理程序的样子。



//routes.interface
import { Router } from 'express';

interface Route {
  path?: string;
  router: Router;
}

export default Route;


Enter fullscreen mode Exit fullscreen mode


//jobs.route.ts
import { Router } from 'express';
import { CreateJobDto } from '../../common/dtos/createJob.dto';
import Route from '../../common/interfaces/routes.interface';
import JobsController from '../../controllers/jobs.controller';
import validationMiddleware from '../middlewares/validation.middleware';

class JobsRoute implements Route {
  public path = '/jobs';
  public router = Router();

  constructor(private jobsController = new JobsController()) {
    this.initializeRoutes();
  }

  private initializeRoutes() {
    this.router.get(`${this.path}/:id`, this.jobsController.getJobById);
    this.router.post(`${this.path}`, validationMiddleware(CreateJobDto, 'body'), this.jobsController.createJob);
  }
}

export default JobsRoute;


Enter fullscreen mode Exit fullscreen mode

为了进行验证,我们使用自定义验证中间件,它使用类验证器类转换器来验证 DTO。



//createJob.dto
import { Expose } from 'class-transformer';
import { IsNotEmpty, IsNumber, IsString, Matches } from 'class-validator';
import { IsGreaterThan } from './validators/isGreaterThan';
import { MaximumDifference } from './validators/maximumDifference';

export class CreateJobDto {
  @IsNotEmpty()
  @IsString()
  @Matches(/^(?:https?:\/\/)?(?:www\.)?(?:youtu\.be\/|youtube\.com\/(?:embed\/|v\/|watch\?v=|watch\?.+&v=))((\w|-){11})(?:\S+)?$/, {
    message: 'Invalid youtube url',
  })
  @Expose()
  public youtubeUrl: string;

  @IsNotEmpty()
  @IsNumber()
  @Expose()
  public startTime: number;

  @IsNotEmpty()
  @IsNumber()
  @IsGreaterThan('startTime', {
    message: 'end time must be greater than start time',
  })
  @MaximumDifference('startTime', {
    message: 'maximum gif duration is 30 seconds',
  })
  @Expose()
  public endTime: number;
}


Enter fullscreen mode Exit fullscreen mode

请注意,IsGreaterThan 和 MaximumDifference 是自定义的 class-validator 验证装饰器,它们本质上看起来像这样(更多信息可以在class-validator 文档中找到)。



//isGreaterThan.ts
import { registerDecorator, ValidationOptions, ValidationArguments } from 'class-validator';

export function IsGreaterThan(property: string, validationOptions?: ValidationOptions) {
  return function (object: Object, propertyName: string) {
    registerDecorator({
      name: 'isGreaterThan',
      target: object.constructor,
      propertyName: propertyName,
      constraints: [property],
      options: validationOptions,
      validator: {
        validate(value: any, args: ValidationArguments) {
          const [relatedPropertyName] = args.constraints;
          const relatedValue = (args.object as any)[relatedPropertyName];
          return typeof value === 'number' && typeof relatedValue === 'number' && value > relatedValue; 
        },
      },
    });
  };
}


Enter fullscreen mode Exit fullscreen mode

MaximumDifference 看起来与此类似,但它的返回值却是这样的。



return typeof value === 'number' && typeof relatedValue === 'number' && value - relatedValue <= difference; 


Enter fullscreen mode Exit fullscreen mode

现在我们的验证中间件看起来像这样



validation.middleware.ts
import { plainToClass } from 'class-transformer';
import { validate, ValidationError } from 'class-validator';
import { RequestHandler } from 'express';

const validationMiddleware = (type: any, value: string | 'body' | 'query' | 'params' = 'body', skipMissingProperties = false): RequestHandler => {
  return (req, res, next) => {
    validate(plainToClass(type, req[value]), { skipMissingProperties }).then((errors: ValidationError[]) => {
      if (errors.length > 0) {
        const message = errors.map((error: ValidationError) => Object.values(error.constraints)).join(', ');
        res.status(400).send(message);
      } else {
        next();
      }
    });
  };
};

export default validationMiddleware;



Enter fullscreen mode Exit fullscreen mode

控制器

我们的控制器看起来相当标准,唯一需要注意的是,我们使用 class-transformer 中的 plainToClass 方法,并设置 excludeExtraneousValues: true,从请求体中提取 CreateJobDto 对象。这种方法只解构暴露的字段(CreateJobDto 类中使用了 @Expose() 装饰器)。更多相关信息请参阅class-transformer 文档。



//jobs.controllers.ts
import { plainToClass } from 'class-transformer';
import { NextFunction, Request, Response } from 'express';
import { CreateJobDto } from '../common/dtos/createJob.dto';
import { Job } from '../entities/jobs.entity';
import JobsService from '../services/jobs.service';

class JobsController {
  constructor(private jobService = new JobsService()) {}

  public createJob = async (req: Request, res: Response, next: NextFunction): Promise<void> => {
    try {
      const jobDto: CreateJobDto = plainToClass(CreateJobDto, req.body, { excludeExtraneousValues: true });
      const createdJob: Job = await this.jobService.createJob(jobDto);

      res.status(201).json(createdJob);
    } catch (error) {
      next(error);
    }
  };

  public getJobById = async (req: Request, res: Response, next: NextFunction): Promise<void> => {
    try {
      const jobId = req.params.id;
      const job: Job = await this.jobService.findJobById(jobId);

      const responseStatus = job.status === 'done' ? 200 : 202;
      res.status(responseStatus).json(job);
    } catch (error) {
      next(error);
    }
  };
}

export default JobsController;


Enter fullscreen mode Exit fullscreen mode

另需注意的是,当转换任务仍在处理中时,[GET] /job/{id} 的响应状态码为 202。有关此内容的更多信息,请参阅异步请求-响应模式。

如果发生错误,该错误将传递给错误中间件,它是我们 Express 中间件链中的最后一个中间件,其代码如下所示:



//error.middleware.ts
import { NextFunction, Request, Response } from 'express';

import { isBoom, Boom } from '@hapi/boom';
import { logger } from '../../common/utils/logger';

function errorMiddleware(error: Boom | Error, req: Request, res: Response, next: NextFunction) {
  const statusCode: number = isBoom(error) ? error.output.statusCode : 500;
  const errorMessage: string = isBoom(error) ? error.message : 'Something went wrong';
  logger.error(`StatusCode : ${statusCode}, Message : ${error}`);

  return res.status(statusCode).send(errorMessage);
}
export default errorMiddleware;



Enter fullscreen mode Exit fullscreen mode

您可能注意到我们导入了一个名为 Boom 的软件包,我们稍后会在“服务”部分详细介绍它。

服务

就业服务

JobService 包含所有业务逻辑,并可访问数据访问层,同时还与 RabbitMQ 服务通信,将事件分发到队列。



//jobs.service.ts
import * as Boom from '@hapi/boom';
import Container from 'typedi';
import { CreateJobDto } from '../common/dtos/createJob.dto';
import EventEmitter from '../common/utils/eventEmitter';
import { Job } from '../entities/jobs.entity';
import RabbitMQService from './rabbitmq.service';

class JobsService {
  private events = {
    JobCreated: 'JobCreated',
  };

  constructor() {
    this.intiializeEvents();
  }

  private intiializeEvents() {
    EventEmitter.on(this.events.JobCreated, (job: Job) => {
      const rabbitMQInstance = Container.get(RabbitMQService);
      rabbitMQInstance.sendToQueue(JSON.stringify(job));
    });
  }

  public async findJobById(jobId: string): Promise<Job> {
    const job: Job = await Job.findOne(jobId);
    if (!job) throw Boom.notFound();

    return job;
  }

  public async createJob(jobDto: CreateJobDto): Promise<Job> {
    const createdJob: Job = await Job.save({ ...jobDto, youtubeId: jobDto.youtubeUrl.split('v=')[1]?.slice(0, 11), status: 'pending' } as Job);
    EventEmitter.emit(this.events.JobCreated, createdJob);
    return createdJob;
  }
}

export default JobsService;


Enter fullscreen mode Exit fullscreen mode

首先,您可能会看到两个不熟悉的导入语句,我们将快速浏览一下,然后在本类中详细解释每个函数。

  • 繁荣
    • 用于创建具有强大、简洁且友好接口的 HTTP 对象。您可以看到抛出 404 Not Found 错误对象是多么容易。
  • 类型
    • TypeDI 是一个功能强大的依赖注入包,它拥有许多特性。其中一项特性就是单例服务,这也是我们在这里使用它的方式。

现在让我们更详细地了解一下该类中的一些功能。

初始化事件()

此函数使用一个全局 EventEmitter,我们在整个项目中使用它来添加发布/订阅层。它非常简单。



//eventEmitter.ts
import { EventEmitter } from 'events';
export default new EventEmitter();


Enter fullscreen mode Exit fullscreen mode

现在我们可以开始监听事件,特别是稍后在创建新工作时发出的名为“JobCreated”的事件。



  // Defines all the events in our service
  private events = {
    JobCreated: 'JobCreated',
  };

  private intiializeEvents() {
    // Start listening for the event 'JobCreated'
    EventEmitter.on(this.events.JobCreated, (job: Job) => {
    // Get a singleton instance of our RabbitMQService
      const rabbitMQInstance = Container.get(RabbitMQService);
    // Dispatch an event containing the data of the created job
      rabbitMQInstance.sendToQueue(JSON.stringify(job));
    });
  }


Enter fullscreen mode Exit fullscreen mode

有关向 Express 后端添加发布/订阅层的更多信息,请参阅相关文档。

创建作业()

这个函数只做两件事。

  • 在数据库中创建新的职位文档
  • 发送一个名为“JobCreated”的事件,表示已创建新作业,这样事件监听器就会处理将该事件分发给 RabbitMQ 服务的逻辑。

RabbitMQ 服务

该服务负责连接到 RabbitMQ 服务器,创建通道并初始化队列,该队列将用于生成任务(将由我们的服务工作线程使用)。

我们使用amqplib作为 RabbitMQ 服务器的客户端。



//rabbitmq.service.ts
import { Service } from 'typedi';
import amqp, { Channel, Connection } from 'amqplib';
import { logger } from '../common/utils/logger';

@Service()
export default class RabbitMQService {
  private connection: Connection;
  private channel: Channel;
  private queueName = 'ytgif-jobs';
  constructor() {
    this.initializeService();
  }

  private async initializeService() {
    try {
      await this.initializeConnection();
      await this.initializeChannel();
      await this.initializeQueues();
    } catch (err) {
      logger.error(err);
    }
  }
  private async initializeConnection() {
    try {
      this.connection = await amqp.connect(process.env.NODE_ENV === 'production' ? process.env.RABBITMQ_PROD : process.env.RABBITMQ_DEV);
      logger.info('Connected to RabbitMQ Server');
    } catch (err) {
      throw err;
    }
  }

  private async initializeChannel() {
    try {
      this.channel = await this.connection.createChannel();
      logger.info('Created RabbitMQ Channel');
    } catch (err) {
      throw err;
    }
  }

  private async initializeQueues() {
    try {
      await this.channel.assertQueue(this.queueName, {
        durable: true,
      });
      logger.info('Initialized RabbitMQ Queues');
    } catch (err) {
      throw err;
    }
  }

  public async sendToQueue(message: string) {
    this.channel.sendToQueue(this.queueName, Buffer.from(message), {
      persistent: true,
    });
    logger.info(`sent: ${message} to queue ${this.queueName}`);
  }
}



Enter fullscreen mode Exit fullscreen mode

用于引导连接/通道/队列的代码相当标准,您可以在RabbitMQ 文档anqplib 文档中找到这些函数的参考资料。我们需要从类外部使用的唯一函数是`sendToQueue()`,它用于将消息分发到我们的任务队列,就像在 JobService 中通过分发字符串化的 Job 对象来实现的那样。



 rabbitMQInstance.sendToQueue(JSON.stringify(job));


Enter fullscreen mode Exit fullscreen mode

现在我们只需要在应用程序启动时像这样初始化 RabbitMQ 服务即可。



import Container from 'typedi';

// Call initializeRabbitMQ() somewhere when starting the app
private initializeRabbitMQ() {
    Container.get(RabbitMqService);
  }


Enter fullscreen mode Exit fullscreen mode

现在后端服务的工作已经完成,剩下的就是节点服务工作进程处理任务队列并进行实际的 GIF 转换。

请记住,完整的源代码可以在GitHub 仓库中查看。

在本系列的下一部分中,我们将看到如何实现一个节点服务工作线程,它将消费任务队列并执行实际的 YouTube 到 GIF 转换。

文章来源:https://dev.to/ragrag/node-backend-server-youtube-gif-maker-using-next-js-node-and-rabbitmq-47h7