RabbitMQ
RabbitMQ是一个开源且轻量级的消息代理,支持多种消息协议。 它可以部署在分布式和联合配置 中,以满足高规模、高可用性的要求。 此外,它是最广泛部署的消息代理,在小型创业公司和大型企业中被广泛使用。
安装
要开始构建基于RabbitMQ的微服务,首先安装所需的包:
npm i --save amqplib amqp-connection-manager
概述
要使用RabbitMQ传输器,请将以下选项对象传递给createMicroservice()
方法:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
});
Transport
枚举从@nestjs/microservices
包中导入。
选项
options
属性是特定于所选传输器的。RabbitMQ传输器公开了下面描述的属性。
urls
:连接URLqueue
:服务器将监听的队列名称prefetchCount
:设置通道的预取计数isGlobalPrefetchCount
:启用每通道预取noAck
:如果为false,则启用手动确认模式queueOptions
:附加队列选项(在此处阅读更多)socketOptions
:附加套接字选项(在此处阅读更多)headers
:要与每条消息一起发送的标头
客户端
像其他微服务传输器一样,
您有几种方法
可以创建RabbitMQ ClientProxy
实例。
一种创建实例的方法是使用ClientsModule
。
要使用ClientsModule
创建客户端实例,
导入它并使用register()
方法传递一个带有与createMicroservice()
方法中显示的相同属性的选项对象,
以及一个用作注入令牌的name
属性。
在此处阅读有关ClientsModule
的更多信息。
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
},
]),
]
...
})
也可以使用其他创建客户端的选项(ClientProxyFactory
或@Client()
)。
您可以在此处阅读有关它们的信息。
上下文
在更复杂的场景中,您可能希望访问有关传入请求的更多信息。
当使用RabbitMQ传输器时,可以访问RmqContext
对象。
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(`Pattern: ${context.getPattern()}`);
}
@Payload()
,@Ctx()
和RmqContext
从@nestjs/microservices
包中导入。
要访问原始的RabbitMQ消息(具有properties
、fields
和content
),
请使用RmqContext
对象的getMessage()
方法,如下所示:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getMessage());
}
要检索到RabbitMQ通道的引用,
请使用RmqContext
对象的getChannelRef
方法,如下所示:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getChannelRef());
}
消息确认
为了确保消息永远不会丢失,RabbitMQ支持消息确认。 消费者通过向RabbitMQ发送确认来告诉它特定消息已被接收、处理, 并且RabbitMQ可以安全删除它。如果消费者死亡(其通道关闭、连接关闭或TCP连接丢失), 而没有发送确认,RabbitMQ将理解该消息未完全处理,并将其重新排队。
要启用手动确认模式,请将noAck
属性设置为false
:
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: false,
queueOptions: {
durable: false
},
},
当启用手动消费者确认时,必须从工作程序发送适当的确认以表示我们已完成任务。
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}
记录构建器
为了配置消息选项,您可以使用RmqRecordBuilder
类(注意:这也适用于基于事件的流)。
例如,要设置headers
和priority
属性,请使用setOptions
方法,如下所示:
const message = ':cat:';
const record = new RmqRecordBuilder(message)
.setOptions({
headers: {
['x-version']: '1.0.0',
},
priority: 3,
})
.build();
this.client.send('replace-emoji', record).subscribe(...);
RmqRecordBuilder
类从@nestjs/microservices
包中导出。
您还可以在服务器端读取这些值,方法是访问RmqContext
,如下所示:
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
const { properties: { headers } } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}