Skip to main content

RabbitMQ

RabbitMQ是一个开源且轻量级的消息代理,支持多种消息协议。 它可以部署在分布式和联合配置中,以满足高规模、高可用性的要求。 此外,它是最广泛部署的消息代理,在小型创业公司和大型企业中被广泛使用。

安装

要开始构建基于RabbitMQ的微服务,首先安装所需的包:

npm i --save amqplib amqp-connection-manager

概述

要使用RabbitMQ传输器,请将以下选项对象传递给createMicroservice()方法:

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
});
tip

Transport枚举从@nestjs/microservices包中导入。

选项

options属性是特定于所选传输器的。RabbitMQ传输器公开了下面描述的属性。

  • urls:连接URL
  • queue:服务器将监听的队列名称
  • prefetchCount:设置通道的预取计数
  • isGlobalPrefetchCount:启用每通道预取
  • noAck:如果为false,则启用手动确认模式
  • queueOptions:附加队列选项(在此处阅读更多)
  • socketOptions:附加套接字选项(在此处阅读更多)
  • headers:要与每条消息一起发送的标头

客户端

像其他微服务传输器一样, 您有几种方法 可以创建RabbitMQ ClientProxy实例。

一种创建实例的方法是使用ClientsModule。 要使用ClientsModule创建客户端实例, 导入它并使用register()方法传递一个带有与createMicroservice()方法中显示的相同属性的选项对象, 以及一个用作注入令牌的name属性。 在此处阅读有关ClientsModule的更多信息。

@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
},
]),
]
...
})

也可以使用其他创建客户端的选项(ClientProxyFactory@Client())。 您可以在此处阅读有关它们的信息。

上下文

在更复杂的场景中,您可能希望访问有关传入请求的更多信息。 当使用RabbitMQ传输器时,可以访问RmqContext对象。

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(`Pattern: ${context.getPattern()}`);
}
tip

@Payload()@Ctx()RmqContext@nestjs/microservices包中导入。

要访问原始的RabbitMQ消息(具有propertiesfieldscontent), 请使用RmqContext对象的getMessage()方法,如下所示:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getMessage());
}

要检索到RabbitMQ通道的引用, 请使用RmqContext对象的getChannelRef方法,如下所示:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getChannelRef());
}

消息确认

为了确保消息永远不会丢失,RabbitMQ支持消息确认。 消费者通过向RabbitMQ发送确认来告诉它特定消息已被接收、处理, 并且RabbitMQ可以安全删除它。如果消费者死亡(其通道关闭、连接关闭或TCP连接丢失), 而没有发送确认,RabbitMQ将理解该消息未完全处理,并将其重新排队。

要启用手动确认模式,请将noAck属性设置为false

options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: false,
queueOptions: {
durable: false
},
},

当启用手动消费者确认时,必须从工作程序发送适当的确认以表示我们已完成任务。

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();

channel.ack(originalMsg);
}

记录构建器

为了配置消息选项,您可以使用RmqRecordBuilder类(注意:这也适用于基于事件的流)。 例如,要设置headerspriority属性,请使用setOptions方法,如下所示:

const message = ':cat:';
const record = new RmqRecordBuilder(message)
.setOptions({
headers: {
['x-version']: '1.0.0',
},
priority: 3,
})
.build();

this.client.send('replace-emoji', record).subscribe(...);
tip

RmqRecordBuilder类从@nestjs/microservices包中导出。

您还可以在服务器端读取这些值,方法是访问RmqContext,如下所示:

@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
const { properties: { headers } } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}