Skip to main content

队列

队列是一种强大的设计模式,帮助您解决常见的应用程序扩展和性能挑战。 队列可以帮助您解决的一些问题示例包括:

  1. 平滑处理处理高峰。例如,如果用户可以在任意时间启动消耗资源的任务,您可以将这些任务添加到队列中而不是同步执行。然后,您可以让工作进程以受控的方式从队列中拉取任务。随着应用程序规模扩大,您可以轻松地添加新的队列消费者以提高后端任务处理的规模。
  2. 分解可能会阻塞 Node.js 事件循环的大型任务。例如,如果用户请求需要 CPU 密集型工作,如音频转码,您可以将此任务委派给其他进程,从而释放用户界面进程以保持响应性。
  3. 在各种服务之间提供可靠的通信通道。例如,您可以在一个进程或服务中排队任务(作业),然后在另一个进程或服务中消费它们。您可以通过监听状态事件(例如完成、错误或作业生命周期中的其他状态更改)来被通知。当队列生产者或消费者失败时,它们的状态被保留,任务处理可以在节点重新启动时自动重新启动。

Nest 提供了 @nestjs/bull 包作为 Bull 的抽象/包装, Bull 是一个受欢迎、得到良好支持且性能卓越的基于 Node.js 的队列系统实现。 该包使得在 Nest 应用程序中以 Nest 友好的方式集成 Bull 队列变得简单。

Bull 使用 Redis 来持久化作业数据,因此您需要在系统上安装 Redis。 由于它是基于 Redis 的,因此您的队列架构可以是完全分布式和独立于平台的。 例如,您可以在一个(或多个)节点上运行 Nest 中的一些队列生产者和消费者, 并在其他网络节点上运行其他 Node.js 平台上的其他生产者消费者监听器

本章介绍了 @nestjs/bull 包。 我们还建议阅读 Bull 文档以获取更多背景和具体实现细节。

安装

要开始使用它,首先安装所需的依赖项。

npm install --save @nestjs/bull bull

安装过程完成后,我们可以将 BullModule 导入到根 AppModule 中。

import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';

@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}

forRoot() 方法用于注册将用于应用程序中注册的所有队列的 bull 包配置对象(除非另有规定)。 配置对象由以下属性组成:

  • limiter: RateLimiter - 用于控制队列作业处理速率的选项。有关详细信息,请参阅 RateLimiter。可选。
  • redis: RedisOpts - 用于配置 Redis 连接的选项。有关详细信息,请参阅 RedisOpt。可选。
  • prefix: string - 所有队列键的前缀。可选。
  • defaultJobOptions: JobOpts - 用于控制新作业的默认设置的选项。有关详细信息,请参阅 JobOpts。可选。
  • settings: AdvancedSettings - 高级队列配置设置。通常不应更改这些设置。有关详细信息,请参阅 AdvancedSettings。可选。

所有选项都是可选的,提供了对队列行为的详细控制。这些选项直接传递给 Bull Queue 构造函数。 在这里阅读更多关于这些选项的信息。

要注册队列,请导入 BullModule.registerQueue() 动态模块,如下所示:

BullModule.registerQueue({
name: 'audio',
});
note

通过将多个逗号分隔的配置对象传递给 registerQueue() 方法来创建多个队列

registerQueue() 方法用于实例化和/或注册队列。 队列在连接到相同的底层 Redis 数据库时,跨模块和进程共享。 每个队列都是通过其名称属性唯一的。 队列名称既用作注入标记(用于将队列注入到控制器/提供者中),也用作装饰器的参数, 将消费者类和监听器与队列关联起来。

您还可以覆盖特定队列的一些预配置选项,如下所示:

BullModule.registerQueue({
name: 'audio',
redis: {
port: 6380,
},
});

由于作业在 Redis 中持久化,每次实例化特定命名的队列(例如,当应用程序启动/重新启动时)时, 它都会尝试处理可能来自前一个未完成会话的任何旧作业。

每个队列可以有一个或多个生产者、消费者和监听器。 消费者按照特定顺序从队列中检索作业:FIFO(默认)、LIFO 或根据优先级。控制队列处理顺序的方法在此处讨论。

命名配置

如果您的队列连接到多个不同的 Redis 实例,您可以使用一种称为命名配置的技术。 此功能允许您在指定的键下注册多个配置,然后可以在队列选项中引用这些配置。

例如,假设除默认实例外,您有一个额外的 Redis 实例,由应用程序中注册的一些队列使用,您可以注册其配置如下:

BullModule.forRoot('alternative-config', {
redis: {
port: 6381,
},
});

在上面的例子中,'alternative-config' 只是一个配置键(可以是任意字符串)。

有了这个配置,您现在可以在 registerQueue() 选项对象中指向此配置:

BullModule.registerQueue({
configKey: 'alternative-config',
name: 'video'
});

生产者

作业生产者将作业添加到队列。生产者通常是应用程序服务(Nest 提供者)。 要将作业添加到队列,请首先将队列注入到服务中,如下所示:

import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';

@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
tip

@InjectQueue() 装饰器通过其名称识别队列,如 registerQueue() 方法调用中提供的(例如,'audio')。

现在,通过调用队列的 add() 方法并传递用户定义的作业对象,添加作业。 作业表示为可序列化的 JavaScript 对象(因为它们是如何存储在 Redis 数据库中的)。 您传递的作业的形状是任意的;使用它来表示作业对象的语义。

const job = await this.audioQueue.add({
foo: 'bar',
});

命名作业

作业可以具有唯一的名称。这允许您创建专门处理具有给定名称的作业的消费者

const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});
warning

使用命名作业时,您必须为添加到队列的每个唯一名称创建处理器,否则队列将抱怨缺少给定作业的处理器。 有关使用命名作业的更多信息,请参见此处

作业选项

作业可以具有与其关联的其他选项。在 Queue.add() 方法的job参数后传递一个选项对象。作业选项属性包括:

  • priority: number - 可选的优先级值。范围从 1(最高优先级)到 MAX_INT(最低优先级)。请注意,使用优先级会对性能产生轻微影响,因此谨慎使用。
  • delay: number - 等待作业可以处理之前的时间量(毫秒)。请注意,为了准确的延迟,服务器和客户端都应该同步其时钟。
  • attempts: number - 尝试作业直到完成的总次数。
  • repeat: RepeatOpts - 根据 cron 规范重复作业。请参阅 RepeatOpts
  • backoff: number | BackoffOpts - 如果作业失败,则用于自动重试的退避设置。请参阅 BackoffOpts
  • lifo: boolean - 如果为 true,则将作业添加到队列的右端而不是左端(默认为 false)。
  • timeout: number - 作业应在多少毫秒后失败并显示超时错误。
  • jobId: number | string - 覆盖作业 ID,默认情况下,作业 ID 是唯一的整数,但您可以使用此设置进行覆盖。如果您尝试添加已存在 ID 的作业,它将不会被添加。
  • removeOnComplete: boolean | number - 如果为 true,则在成功完成时删除作业。数字指定要保留的作业数量。默认行为是将作业保留在已完成的集合中。
  • removeOnFail: boolean | number - 如果为 true,则在所有尝试后失败时删除作业。数字指定要保留的作业数量。默认行为是将作业保留在失败的集合中。
  • stackTraceLimit: number - 限制将记录在堆栈跟踪中的行数。

以下是使用作业选项自定义作业的一些示例。

要延迟作业的开始,请使用 delay 配置属性。

const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ delay: 3000 }, // 3 秒延迟
);

要将作业添加到队列的右端(以 LIFO(后进先出)方式处理作业),请将配置对象的 lifo 属性设置为 true

const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true },
);

要为作业设置优先级,请使用 priority 属性。

const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ priority: 2 },
);

消费者

消费者是定义方法的类,这些方法要么处理添加到队列的作业,要么监听队列上的事件,或两者兼而有之。 使用 @Processor() 装饰器声明一个消费者类,如下所示:

import { Processor } from '@nestjs/bull';

@Processor('audio')
export class AudioConsumer {}
tip

必须将消费者注册为提供者,以便 @nestjs/bull 包可以捕获它们。

装饰符的字符串参数(例如,'audio')是要与类方法关联的队列的名称。

在消费者类中,通过使用 @Process() 装饰符装饰处理程序方法来声明作业处理程序,如下所示:

import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('audio')
export class AudioConsumer {
@Process()
async transcode(job: Job<unknown>) {
let progress = 0;
for (let i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 1;
await job.progress(progress);
}
return {};
}
}

装饰的方法(例如,transcode())在工作程序处于空闲状态且队列中有要处理的作业时调用。 此处理程序方法将作业对象作为其唯一参数。 处理程序方法返回的值将存储在作业对象中,并可以在稍后的监听器中访问,例如在完成事件的监听器中。

Job对象具有多个方法,允许您与其状态进行交互。 例如,上述代码使用 progress() 方法更新作业的进度。 有关完整的Job对象 API 参考,请参阅此处

您可以指定作业处理程序方法处理特定类型的作业(具有特定名称的作业) 通过将该名称传递给 @Process() 装饰符,如下所示。

您可以在给定的消费者类中有多个 @Process() 处理程序,对应于每个作业类型(name)。 当使用命名作业时,请确保每个名称都有一个处理程序。

@Process('transcode')
async transcode(job: Job<unknown>) { ... }
warning

当为相同队列定义多个消费者时,@Process({ concurrency: 1 }) 中的并发选项不会生效。 最小并发将与定义的消费者数量相匹配。即使 @Process() 处理程序使用不同的名称处理命名作业时, 也会发生这种情况。

请求范围的消费者

当消费者标记为请求范围时(了解有关注入范围的更多信息), 将为每个作业创建一个类的新实例。在作业完成后,该实例将被垃圾回收。

@Processor({
name: 'audio',
scope: Scope.REQUEST,
})

由于请求范围的消费者类是动态实例化的并且仅对单个作业进行范围化, 因此可以使用标准方法通过构造函数注入 JOB_REF

constructor(@Inject(JOB_REF) jobRef: Job) {
console.log(jobRef);
}
tip

JOB_REF 令牌从 @nestjs/bull 包中导入。

事件监听器

Bull在队列和/或作业状态发生变化时生成一组有用的事件。 Nest提供了一组装饰器,允许订阅一组标准事件。这些从@nestjs/bull包中导出。

事件监听器必须在消费者类中声明(即,在用@Processor()装饰器装饰的类内)。 要监听事件,请使用下表中的其中一个装饰器声明事件的处理程序。 例如,要监听音频队列中作业进入活动状态时发出的事件,使用以下结构:

import { Processor, Process, OnQueueActive } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('audio')
export class AudioConsumer {

@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
// 其他事件处理程序
// ...
}

由于Bull在分布式(多节点)环境中运行,它定义了事件局部性的概念。 此概念认识到事件可能是在单个进程内完全触发的,也可能是从不同进程共享的队列上触发的。 本地事件是在本地进程中对队列的动作或状态更改触发时产生的事件。 换句话说,当事件生成器和消费者都在单个进程中时,发生在队列上的所有事件都是本地的。

当队列跨多个进程共享时,我们会遇到全局事件的可能性。 为了使一个进程中的监听器接收到由另一个进程触发的事件通知,它必须注册为全局事件。

事件处理程序在其对应的事件发出时调用。 处理程序的调用遵循下表所示的签名,提供了与事件相关的信息的访问权限。 我们将在下面讨论本地和全局事件处理程序签名之间的一个关键区别。

本地事件监听器全局事件监听器处理程序方法签名/触发时机
@OnQueueError()@OnGlobalQueueError()handler(error: Error) - 发生错误。error 包含触发错误的详细信息。
@OnQueueWaiting()@OnGlobalQueueWaiting()handler(jobId: number
@OnQueueActive()@OnGlobalQueueActive()handler(job: Job) - 作业 job 已启动。
@OnQueueStalled()@OnGlobalQueueStalled()handler(job: Job) - 作业 job 被标记为停滞。这对于调试崩溃或暂停事件循环的作业工作者很有用。
@OnQueueProgress()@OnGlobalQueueProgress()handler(job: Job, progress: number) - 作业 job 的进度已更新为值 progress。
@OnQueueCompleted()@OnGlobalQueueCompleted()handler(job: Job, result: any) - 作业 job 成功完成,结果为 result。
@OnQueueFailed()@OnGlobalQueueFailed()handler(job: Job, err: Error) - 作业 job 由于错误 err 失败。
@OnQueuePaused()@OnGlobalQueuePaused()handler() - 队列已暂停。
@OnQueueResumed()@OnGlobalQueueResumed()handler(job: Job) - 队列已恢复。
@OnQueueCleaned()@OnGlobalQueueCleaned()handler(jobs: Job[], type: string) - 旧作业已从队列中清除。jobs 是已清除的作业数组,type 是清除的作业类型。
@OnQueueDrained()@OnGlobalQueueDrained()handler() - 每当队列处理完所有等待的作业时触发(即使可能还有一些延迟作业尚未处理)。
@OnQueueRemoved()@OnGlobalQueueRemoved()handler(job: Job) - 作业 job 已成功移除。

在监听全局事件时,方法签名可能与其本地版本略有不同。 具体而言,在本地版本中接收job对象的任何方法签名,在全局版本中将接收作业的 jobIdnumber)。 要在这种情况下获取对实际job对象的引用,使用 Queue#getJob 方法。 此调用应等待,因此处理程序应声明为async。 例如:

@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
const job = await this.immediateQueue.getJob(jobId);
console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}
note

要访问 Queue 对象(进行 getJob() 调用),您当然必须注入它。 此外,Queue 必须在您注入它的模块中注册。

除了特定的事件监听器装饰器之外,您还可以结合使用通用的 @OnQueueEvent() 装饰器 和 BullQueueEventsBullQueueGlobalEvents 枚举。 有关事件的更多信息,请阅读此处

队列管理

队列具有一组API,允许您执行管理功能,如暂停和恢复、检索各种状态的作业计数等。 您可以在此处找到完整的队列API。 直接在Queue对象上调用这些方法,如下所示,其中包括暂停/恢复的示例。

使用pause()方法调用暂停队列。暂停的队列将不会处理新作业,直到恢复,但正在处理的当前作业将继续直到完成。

await audioQueue.pause();

要恢复暂停的队列,请使用resume()方法,如下所示:

await audioQueue.resume();

单独的进程

作业处理程序也可以在单独的(分叉的)进程中运行(来源)。 这有几个优势:

  • 该进程是隔离的,因此如果崩溃,它不会影响工作者。
  • 您可以运行阻塞代码而不影响队列(作业不会停滞)。
  • 更好地利用多核CPU。
  • 与Redis的连接更少。
app.module.ts

import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { join } from 'path';

@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}

请注意,因为您的函数在分叉的进程中执行,所以依赖注入(和IoC容器)将不可用。 这意味着您的处理器函数将需要包含(或创建)它所需的所有外部依赖项的所有实例。

processor.js

import { Job, DoneCallback } from 'bull';

export default function (job: Job, cb: DoneCallback) {
console.log(`[${process.pid}] ${JSON.stringify(job.data)}`);
cb(null, 'It works');
}

异步配置

您可能希望异步传递bull选项而不是静态传递。 在这种情况下,使用forRootAsync()方法,该方法提供了处理异步配置的几种方式。 同样,如果要异步传递队列选项,请使用registerQueueAsync()方法。

一种方法是使用工厂函数:

// 异步传递bull选项
BullModule.forRootAsync({
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});

我们的工厂函数就像任何其他异步提供者一样 (例如,它可以是异步的,并且可以通过inject注入依赖项):

// 通过工厂函数注入依赖项
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});

或者,您可以使用useClass语法:

// 通过useClass使用异步配置
BullModule.forRootAsync({
useClass: BullConfigService,
});

上面的构造将在BullModule内实例化BullConfigService, 并使用它通过调用createSharedConfiguration()提供一个选项对象。 请注意,这意味着BullConfigService必须实现SharedBullConfigurationFactory接口, 如下所示:

// 实现SharedBullConfigurationFactory接口
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
createSharedConfiguration(): BullModuleOptions {
return {
redis: {
host: 'localhost',
port: 6379,
},
};
}
}

为了防止在BullModule内创建BullConfigService并使用从不同模块导入的提供者, 您可以使用useExisting语法。

// 使用useExisting以防止在BullModule内创建新实例
BullModule.forRootAsync({
imports: [ConfigModule],
useExisting: ConfigService,
});

此构造与useClass相同,但有一个关键的区别 - BullModule 将查找已导入的模块以重用现有的ConfigService,而不是实例化新的。

示例

可以在此处找到一个可行的示例。