Node后端服务器 - 使用Next.js、Node和RabbitMQ制作YouTube GIF动画
大家好,
本文是“使用 Next.js、Node 和 RabbitMQ 制作 YouTube GIF”系列文章的第二部分。
本文将深入探讨如何构建 YouTube 转 GIF 转换器的后端服务器。本文会包含一些代码片段,但您可以在 GitHub 上访问整个项目,其中包含完整的源代码、额外的集成测试和 Swagger API 文档。您还可以查看应用演示。本文将涵盖以下主题。
功能
如上图所示,后端服务器具有以下 3 个主要功能:
- 通过在数据库中创建新的作业记录来处理 GIF 转换请求
- 将事件分发到 RabbitMQ,表明已创建新的转换作业(任务排队)。
- 通过从数据库中查询作业 ID 来处理作业获取请求,并返回相应的响应。
项目架构
我们的 Express 应用架构包含三个主要组件
- 路由处理程序
- 控制器
- 服务
它们各自具有特定的功能,我们将详细讨论这些功能,并解释其结构如此设计的原因。
- 路由处理程序
- 负责将流量路由到各自的路由处理程序。通常,这些路由处理程序由一个处理程序数组组成,我们称之为“中间件链”,该链中的最后一个处理程序就是路由控制器。
- 中间件链通常负责对传入请求进行“检查”,并在某些情况下修改请求对象。在本例中,我们将使用自定义验证中间件进行验证。
- 控制器
- 从请求中提取数据,并在必要时对数据进行清理。
- 将控制权委托给相关服务
- 处理回复
- 将错误委托给自定义错误处理中间件
- 服务
- 包含所有业务逻辑
- 使用数据访问层(ORM/ODM)访问数据
控制器应该保持“愚钝”,也就是说,它们不应该了解任何业务逻辑的细节,它们只需要知道“哪个服务可以处理这个请求”、“这个服务需要哪些数据”、“响应应该是什么样的”。这样可以避免控制器臃肿。
执行
数据库架构
在这个项目中,我们使用了TypeORM,它是一个支持多种数据库的 TypeScript ORM(如本系列第一部分所述,我们将使用 MongoDB)。
我们将把每次 GIF 转换表示为一个 Job,这将是我们唯一的集合。TypeORM
中的 Job 集合如下所示。
import { BaseEntity, Entity, ObjectID, Column, CreateDateColumn, UpdateDateColumn, ObjectIdColumn } from 'typeorm';
@Entity('jobs')
export class Job extends BaseEntity {
@ObjectIdColumn()
id: ObjectID;
@Column({
nullable: false,
})
youtubeUrl: string;
@Column({
nullable: false,
})
youtubeId: string;
@Column({
nullable: true,
})
gifUrl: string;
@Column({
nullable: false,
})
startTime: number;
@Column({
nullable: false,
})
endTime: number;
@Column({
type: 'enum',
enum: ['pending', 'processing', 'done', 'error'],
})
status: 'pending' | 'processing' | 'done' | 'error';
@Column()
@CreateDateColumn()
createdAt: Date;
@Column()
@UpdateDateColumn()
updatedAt: Date;
}
这里需要注意的是状态字段,它本质上是一个枚举,用于指示 GIF 转换的当前状态。所有其他字段都是执行转换工作所需的标准数据。
路线处理
如前所述,我们只有两条路线。
- 创建新的 GIF 转换任务的路径
- 用于根据转换任务的 ID 获取该任务数据的路由,客户端稍后将使用该 ID 进行轮询。
这就是我们的路由处理程序的样子。
//routes.interface
import { Router } from 'express';
interface Route {
path?: string;
router: Router;
}
export default Route;
//jobs.route.ts
import { Router } from 'express';
import { CreateJobDto } from '../../common/dtos/createJob.dto';
import Route from '../../common/interfaces/routes.interface';
import JobsController from '../../controllers/jobs.controller';
import validationMiddleware from '../middlewares/validation.middleware';
class JobsRoute implements Route {
public path = '/jobs';
public router = Router();
constructor(private jobsController = new JobsController()) {
this.initializeRoutes();
}
private initializeRoutes() {
this.router.get(`${this.path}/:id`, this.jobsController.getJobById);
this.router.post(`${this.path}`, validationMiddleware(CreateJobDto, 'body'), this.jobsController.createJob);
}
}
export default JobsRoute;
为了进行验证,我们使用自定义验证中间件,它使用类验证器和类转换器来验证 DTO。
//createJob.dto
import { Expose } from 'class-transformer';
import { IsNotEmpty, IsNumber, IsString, Matches } from 'class-validator';
import { IsGreaterThan } from './validators/isGreaterThan';
import { MaximumDifference } from './validators/maximumDifference';
export class CreateJobDto {
@IsNotEmpty()
@IsString()
@Matches(/^(?:https?:\/\/)?(?:www\.)?(?:youtu\.be\/|youtube\.com\/(?:embed\/|v\/|watch\?v=|watch\?.+&v=))((\w|-){11})(?:\S+)?$/, {
message: 'Invalid youtube url',
})
@Expose()
public youtubeUrl: string;
@IsNotEmpty()
@IsNumber()
@Expose()
public startTime: number;
@IsNotEmpty()
@IsNumber()
@IsGreaterThan('startTime', {
message: 'end time must be greater than start time',
})
@MaximumDifference('startTime', {
message: 'maximum gif duration is 30 seconds',
})
@Expose()
public endTime: number;
}
//isGreaterThan.ts
import { registerDecorator, ValidationOptions, ValidationArguments } from 'class-validator';
export function IsGreaterThan(property: string, validationOptions?: ValidationOptions) {
return function (object: Object, propertyName: string) {
registerDecorator({
name: 'isGreaterThan',
target: object.constructor,
propertyName: propertyName,
constraints: [property],
options: validationOptions,
validator: {
validate(value: any, args: ValidationArguments) {
const [relatedPropertyName] = args.constraints;
const relatedValue = (args.object as any)[relatedPropertyName];
return typeof value === 'number' && typeof relatedValue === 'number' && value > relatedValue;
},
},
});
};
}
MaximumDifference 看起来与此类似,但它的返回值却是这样的。
return typeof value === 'number' && typeof relatedValue === 'number' && value - relatedValue <= difference;
现在我们的验证中间件看起来像这样
validation.middleware.ts
import { plainToClass } from 'class-transformer';
import { validate, ValidationError } from 'class-validator';
import { RequestHandler } from 'express';
const validationMiddleware = (type: any, value: string | 'body' | 'query' | 'params' = 'body', skipMissingProperties = false): RequestHandler => {
return (req, res, next) => {
validate(plainToClass(type, req[value]), { skipMissingProperties }).then((errors: ValidationError[]) => {
if (errors.length > 0) {
const message = errors.map((error: ValidationError) => Object.values(error.constraints)).join(', ');
res.status(400).send(message);
} else {
next();
}
});
};
};
export default validationMiddleware;
控制器
我们的控制器看起来相当标准,唯一需要注意的是,我们使用 class-transformer 中的 plainToClass 方法,并设置 excludeExtraneousValues: true,从请求体中提取 CreateJobDto 对象。这种方法只解构暴露的字段(CreateJobDto 类中使用了 @Expose() 装饰器)。更多相关信息请参阅class-transformer 文档。
//jobs.controllers.ts
import { plainToClass } from 'class-transformer';
import { NextFunction, Request, Response } from 'express';
import { CreateJobDto } from '../common/dtos/createJob.dto';
import { Job } from '../entities/jobs.entity';
import JobsService from '../services/jobs.service';
class JobsController {
constructor(private jobService = new JobsService()) {}
public createJob = async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
const jobDto: CreateJobDto = plainToClass(CreateJobDto, req.body, { excludeExtraneousValues: true });
const createdJob: Job = await this.jobService.createJob(jobDto);
res.status(201).json(createdJob);
} catch (error) {
next(error);
}
};
public getJobById = async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
const jobId = req.params.id;
const job: Job = await this.jobService.findJobById(jobId);
const responseStatus = job.status === 'done' ? 200 : 202;
res.status(responseStatus).json(job);
} catch (error) {
next(error);
}
};
}
export default JobsController;
另需注意的是,当转换任务仍在处理中时,[GET] /job/{id} 的响应状态码为 202。有关此内容的更多信息,请参阅异步请求-响应模式。
如果发生错误,该错误将传递给错误中间件,它是我们 Express 中间件链中的最后一个中间件,其代码如下所示:
//error.middleware.ts
import { NextFunction, Request, Response } from 'express';
import { isBoom, Boom } from '@hapi/boom';
import { logger } from '../../common/utils/logger';
function errorMiddleware(error: Boom | Error, req: Request, res: Response, next: NextFunction) {
const statusCode: number = isBoom(error) ? error.output.statusCode : 500;
const errorMessage: string = isBoom(error) ? error.message : 'Something went wrong';
logger.error(`StatusCode : ${statusCode}, Message : ${error}`);
return res.status(statusCode).send(errorMessage);
}
export default errorMiddleware;
您可能注意到我们导入了一个名为 Boom 的软件包,我们稍后会在“服务”部分详细介绍它。
服务
就业服务
JobService 包含所有业务逻辑,并可访问数据访问层,同时还与 RabbitMQ 服务通信,将事件分发到队列。
//jobs.service.ts
import * as Boom from '@hapi/boom';
import Container from 'typedi';
import { CreateJobDto } from '../common/dtos/createJob.dto';
import EventEmitter from '../common/utils/eventEmitter';
import { Job } from '../entities/jobs.entity';
import RabbitMQService from './rabbitmq.service';
class JobsService {
private events = {
JobCreated: 'JobCreated',
};
constructor() {
this.intiializeEvents();
}
private intiializeEvents() {
EventEmitter.on(this.events.JobCreated, (job: Job) => {
const rabbitMQInstance = Container.get(RabbitMQService);
rabbitMQInstance.sendToQueue(JSON.stringify(job));
});
}
public async findJobById(jobId: string): Promise<Job> {
const job: Job = await Job.findOne(jobId);
if (!job) throw Boom.notFound();
return job;
}
public async createJob(jobDto: CreateJobDto): Promise<Job> {
const createdJob: Job = await Job.save({ ...jobDto, youtubeId: jobDto.youtubeUrl.split('v=')[1]?.slice(0, 11), status: 'pending' } as Job);
EventEmitter.emit(this.events.JobCreated, createdJob);
return createdJob;
}
}
export default JobsService;
首先,您可能会看到两个不熟悉的导入语句,我们将快速浏览一下,然后在本类中详细解释每个函数。
- 繁荣
- 用于创建具有强大、简洁且友好接口的 HTTP 对象。您可以看到抛出 404 Not Found 错误对象是多么容易。
- 类型
- TypeDI 是一个功能强大的依赖注入包,它拥有许多特性。其中一项特性就是单例服务,这也是我们在这里使用它的方式。
现在让我们更详细地了解一下该类中的一些功能。
初始化事件()
此函数使用一个全局 EventEmitter,我们在整个项目中使用它来添加发布/订阅层。它非常简单。
//eventEmitter.ts
import { EventEmitter } from 'events';
export default new EventEmitter();
现在我们可以开始监听事件,特别是稍后在创建新工作时发出的名为“JobCreated”的事件。
// Defines all the events in our service
private events = {
JobCreated: 'JobCreated',
};
private intiializeEvents() {
// Start listening for the event 'JobCreated'
EventEmitter.on(this.events.JobCreated, (job: Job) => {
// Get a singleton instance of our RabbitMQService
const rabbitMQInstance = Container.get(RabbitMQService);
// Dispatch an event containing the data of the created job
rabbitMQInstance.sendToQueue(JSON.stringify(job));
});
}
有关向 Express 后端添加发布/订阅层的更多信息,请参阅相关文档。
创建作业()
这个函数只做两件事。
- 在数据库中创建新的职位文档
- 发送一个名为“JobCreated”的事件,表示已创建新作业,这样事件监听器就会处理将该事件分发给 RabbitMQ 服务的逻辑。
RabbitMQ 服务
该服务负责连接到 RabbitMQ 服务器,创建通道并初始化队列,该队列将用于生成任务(将由我们的服务工作线程使用)。
我们使用amqplib作为 RabbitMQ 服务器的客户端。
//rabbitmq.service.ts
import { Service } from 'typedi';
import amqp, { Channel, Connection } from 'amqplib';
import { logger } from '../common/utils/logger';
@Service()
export default class RabbitMQService {
private connection: Connection;
private channel: Channel;
private queueName = 'ytgif-jobs';
constructor() {
this.initializeService();
}
private async initializeService() {
try {
await this.initializeConnection();
await this.initializeChannel();
await this.initializeQueues();
} catch (err) {
logger.error(err);
}
}
private async initializeConnection() {
try {
this.connection = await amqp.connect(process.env.NODE_ENV === 'production' ? process.env.RABBITMQ_PROD : process.env.RABBITMQ_DEV);
logger.info('Connected to RabbitMQ Server');
} catch (err) {
throw err;
}
}
private async initializeChannel() {
try {
this.channel = await this.connection.createChannel();
logger.info('Created RabbitMQ Channel');
} catch (err) {
throw err;
}
}
private async initializeQueues() {
try {
await this.channel.assertQueue(this.queueName, {
durable: true,
});
logger.info('Initialized RabbitMQ Queues');
} catch (err) {
throw err;
}
}
public async sendToQueue(message: string) {
this.channel.sendToQueue(this.queueName, Buffer.from(message), {
persistent: true,
});
logger.info(`sent: ${message} to queue ${this.queueName}`);
}
}
用于引导连接/通道/队列的代码相当标准,您可以在RabbitMQ 文档或anqplib 文档中找到这些函数的参考资料。我们需要从类外部使用的唯一函数是`sendToQueue()`,它用于将消息分发到我们的任务队列,就像在 JobService 中通过分发字符串化的 Job 对象来实现的那样。
rabbitMQInstance.sendToQueue(JSON.stringify(job));
现在我们只需要在应用程序启动时像这样初始化 RabbitMQ 服务即可。
import Container from 'typedi';
// Call initializeRabbitMQ() somewhere when starting the app
private initializeRabbitMQ() {
Container.get(RabbitMqService);
}
现在后端服务的工作已经完成,剩下的就是节点服务工作进程处理任务队列并进行实际的 GIF 转换。
在本系列的下一部分中,我们将看到如何实现一个节点服务工作线程,它将消费任务队列并执行实际的 YouTube 到 GIF 转换。
文章来源:https://dev.to/ragrag/node-backend-server-youtube-gif-maker-using-next-js-node-and-rabbitmq-47h7

