Skip to main content

WebSocket

网关

本文档中讨论的大多数概念,如依赖注入、装饰器、异常过滤器、管道、守卫和拦截器,同样适用于网关。 在可能的情况下,Nest抽象出实现细节,以便相同的组件可以在基于HTTP的平台、WebSockets和微服务上运行。 本节介绍了与WebSockets特定的Nest方面。

在Nest中,网关只是一个带有@WebSocketGateway()装饰器的类。 从技术上讲,网关是与平台无关的,这使它们与创建适配器后兼容任何WebSockets库。 默认情况下,支持两个WS平台:socket.iows。 您可以选择最适合您需求的平台。此外,您还可以按照此指南构建自己的适配器。

note

可以将网关视为提供程序;这意味着它们可以通过类构造函数注入依赖项。 此外,其他类(提供程序和控制器)也可以注入网关。

安装

要开始构建基于WebSockets的应用程序,请首先安装所需的软件包:

npm i --save @nestjs/websockets @nestjs/platform-socket.io

概述

通常情况下,每个网关都监听与HTTP服务器相同的端口,除非您的应用程序不是Web应用程序, 或者您手动更改了端口。通过将参数传递给@WebSocketGateway(80)装饰器, 可以修改此默认行为,其中80是所选端口号。您还可以使用以下结构设置网关使用的 命名空间

@WebSocketGateway(80, { namespace: 'events' })
warning

网关在被引用于现有模块的提供程序数组中之前不会实例化。

可以通过将第二个参数传递给@WebSocketGateway()装饰器, 将任何支持的选项传递给socket构造函数, 如下所示:

@WebSocketGateway(81, { transports: ['websocket'] })

现在,网关正在监听,但我们尚未订阅任何传入的消息。 让我们创建一个处理程序,该处理程序将订阅events消息并用相同的数据回应用户。

events.gateway.ts
@SubscribeMessage('events')
handleEvent(@MessageBody() data: string): string {
return data;
}
tip

@SubscribeMessage()@MessageBody()装饰器是从@nestjs/websockets软件包导入的。

创建网关后,我们可以在我们的模块中注册它。

events.module.ts
@Module({
providers: [EventsGateway]
})
export class EventsModule {}

您还可以通过将属性键传递给装饰器,从传入的消息体中提取它:

events.gateway.ts
@SubscribeMessage('events')
handleEvent(@MessageBody('id') id: number): number {
// id === messageBody.id
return id;
}

如果不想使用装饰器,以下代码在功能上是等效的:

events.gateway.ts
@SubscribeMessage('events')
handleEvent(client: Socket, data: string): string {
return data;
}

在上面的例子中,handleEvent()函数接受两个参数。 第一个是特定于平台的socket实例, 而第二个是从客户端接收的数据。 尽管这种方法不推荐,因为它需要在每个单元测试中模拟socket实例。

收到events消息后,处理程序将使用与网络发送的相同数据发送确认。 此外,还可以使用特定于库的方法发出消息,例如使用client.emit()方法。 为了访问已连接的socket实例,请使用@ConnectedSocket()装饰器。

events.gateway.ts
@SubscribeMessage('events')
handleEvent(
@MessageBody() data: string,
@ConnectedSocket() client: Socket,
): string {
return data;
}
tip

@ConnectedSocket()装饰器是从@nestjs/websockets软件包导入的。

但是,在这种情况下,您将无法利用拦截器。如果不想响应用户, 可以简单地跳过return语句(或显式返回"虚假"值,例如undefined)。

现在,当客户端发出以下消息时:

socket.emit('events', { name: 'Nest' });

handleEvent()方法将被执行。 为了监听从上述处理程序中发出的消息,客户端必须附加相应的确认监听器:

socket.emit('events', { name: 'Nest' }, (data) => console.log(data));

多重响应

确认仅分发一次。此外,它不受本机WebSockets实现的支持。 为了解决此限制,可以返回一个包含两个属性的对象。 事件是发出事件的名称,数据必须转发到客户端。

events.gateway.ts
@SubscribeMessage('events')
handleEvent(@MessageBody() data: unknown): WsResponse<unknown> {
const event = 'events';
return { event, data };
}
tip

WsResponse接口是从@nestjs/websockets软件包导入的。

warning

如果您的数据字段依赖于ClassSerializerInterceptor, 则应返回实现WsResponse的类实例,因为它会忽略普通的JavaScript对象响应。

为了监听传入的响应,客户端必须应用另一个事件监听器。

socket.on('events', (data) => console.log(data));

异步响应

消息处理程序可以同步或异步地响应。因此,支持异步方法。 消息处理程序还可以返回Observable,在这种情况下,将在流完成之前发出结果值。

events.gateway.ts
@SubscribeMessage('events')
onEvent(@MessageBody() data: unknown): Observable<WsResponse<number>> {
const event = 'events';
const response = [1, 2, 3];

return from(response).pipe(
map(data => ({ event, data })),
);
}

在上面的示例中,消息处理程序将3次响应(数组中的每个项目)。

生命周期挂钩

有3个有用的生命周期挂钩可用。它们都有相应的接口,并在以下表格中描述:

  • OnGatewayInit:强制实现afterInit()方法。以库特定的服务器实例作为参数(如果需要,还会传递其余实例)。
  • OnGatewayConnection:强制实现handleConnection()方法。以库特定的客户端socket实例作为参数。
  • OnGatewayDisconnect:强制实现handleDisconnect()方法。以库特定的客户端socket实例作为参数。
tip

每个生命周期接口都是从@nestjs/websockets软件包中公开的。

服务器

有时,您可能希望直接访问本机、平台特定的服务器实例。 将对此对象的引用作为参数传递给afterInit()方法(OnGatewayInit接口)。 另一个选项是使用@WebSocketServer()装饰器。

@WebSocketServer()
server: Server;
tip

@WebSocketServer()装饰器是从@nestjs/websockets软件包导入的。

Nest将在准备好使用后自动将服务器实例分配给此属性。

例子

此处提供了一个工作示例。

异常过滤器

HTTP异常过滤器层与相应的WebSockets层之间唯一的区别是, 您应该使用WsException而不是HttpException来抛出异常。

throw new WsException('Invalid credentials.');
tip

WsException类是从@nestjs/websockets软件包导入的。

使用上述示例,Nest将处理抛出的异常并使用以下结构发出exception消息:

{
"status": "error",
"message": "Invalid credentials."
}

过滤器

WebSockets异常过滤器的行为等效于HTTP异常过滤器。 以下示例使用手动实例化的方法范围过滤器。 与基于HTTP的应用程序一样, 您还可以使用网关范围的过滤器(即,在网关类前加上@UseFilters()装饰器)。

@UseFilters(new WsExceptionFilter())
@SubscribeMessage('events')
onEvent(client, data: any): WsResponse<any> {
const event = 'events';
return { event, data };
}

继承

通常,您将创建完全定制的异常过滤器,以满足应用程序的要求。 但是,在某些情况下,您可能希望简单地扩展核心异常过滤器,并根据某些因素覆盖行为。

为了将异常处理委托给基本过滤器,您需要扩展BaseWsExceptionFilter并调用继承的catch()方法。

import { Catch, ArgumentsHost } from '@nestjs/common';
import { BaseWsExceptionFilter } from '@nestjs/websockets';

@Catch()
export class AllExceptionsFilter extends BaseWsExceptionFilter {
catch(exception: unknown, host: ArgumentsHost) {
super.catch(exception, host);
}
}

上述实现只是演示了这种方法。您扩展的异常过滤器的实现将包含您的定制业务逻辑(例如,处理各种条件)。

管道

常规管道和WebSockets管道之间没有根本区别。 唯一的区别是,您应该使用WsException而不是HttpException来抛出异常。 此外,所有管道将仅应用于data参数(因为验证或转换client实例是无用的)。

tip

WsException类是从@nestjs/websockets软件包中公开的。

绑定管道

以下示例使用手动实例化的方法范围管道。 与基于HTTP的应用程序一样, 您还可以使用网关范围的管道(即,在网关类前加上@UsePipes()装饰器)。

@UsePipes(new ValidationPipe())
@SubscribeMessage('events')
handleEvent(client: Client, data: unknown): WsResponse<unknown> {
const event = 'events';
return { event, data };
}

守卫

WebSockets守卫和常规HTTP应用程序守卫之间没有根本区别。 唯一的区别是,您应该使用WsException而不是HttpException来抛出异常。

tip

WsException类是从@nestjs/websockets软件包中公开的。

绑定守卫

以下示例使用方法范围守卫。 与基于HTTP的应用程序一样, 您还可以使用网关范围的守卫(即,在网关类前加上@UseGuards()装饰器)。

@UseGuards(AuthGuard)
@SubscribeMessage('events')
handleEvent(client: Client, data: unknown): WsResponse<unknown> {
const event = 'events';
return { event, data };
}

拦截器

常规拦截器和WebSockets拦截器之间没有区别。 以下示例使用手动实例化的方法范围拦截器。 与基于HTTP的应用程序一样, 您还可以使用网关范围的拦截器(即,在网关类前加上@UseInterceptors()装饰器)。

@UseInterceptors(new TransformInterceptor())
@SubscribeMessage('events')
handleEvent(client: Client, data: unknown): WsResponse<unknown> {
const event = 'events';
return { event, data };
}

适配器

WebSockets模块是平台无关的,因此, 您可以通过使用WebSocketAdapter接口引入自己的库(甚至是本地实现)。 该接口强制实现以下表中描述的几种方法:

  • create:基于传递的参数创建一个socket实例
  • bindClientConnect:绑定客户端连接事件
  • bindClientDisconnect:绑定客户端断开连接事件(可选)
  • bindMessageHandlers:将传入的消息绑定到相应的消息处理程序
  • close:终止服务器实例

扩展socket.io

socket.io包包装在IoAdapter类中。 如果您想增强适配器的基本功能,例如,技术要求需要在负载平衡的多个实例之间广播事件, 您可以扩展IoAdapter并覆盖负责实例化新socket.io服务器的单个方法。 首先,让我们安装所需的包。

warning

要在多个负载平衡实例上使用socket.io, 您必须通过在客户端socket.io配置中设置transports: ['websocket'] 或在负载均衡器中启用基于cookie的路由来禁用轮询。 仅使用Redis是不够的。有关更多信息,请参见此处

npm i --save redis socket.io @socket.io/redis-adapter

安装包后,我们可以创建一个RedisIoAdapter类。

import { IoAdapter } from '@nestjs/platform-socket.io';
import { ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

export class RedisIoAdapter extends IoAdapter {
private adapterConstructor: ReturnType<typeof createAdapter>;

async connectToRedis(): Promise<void> {
const pubClient = createClient({ url: `redis://localhost:6379` });
const subClient = pubClient.duplicate();

await Promise.all([pubClient.connect(), subClient.connect()]);

this.adapterConstructor = createAdapter(pubClient, subClient);
}

createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options);
server.adapter(this.adapterConstructor);
return server;
}
}

然后,简单地切换到新创建的Redis适配器。

const app = await NestFactory.create(AppModule);
const redisIoAdapter = new RedisIoAdapter(app);
await redisIoAdapter.connectToRedis();

app.useWebSocketAdapter(redisIoAdapter);

Ws库

另一个可用的适配器是WsAdapter,它充当框架与集成了高速且经过充分测试的ws库之间的代理。 该适配器完全兼容本机浏览器WebSockets,比socket.io包快得多。 不幸的是,它的可用功能明显较少。然而,在某些情况下,您可能不一定需要它们。

note

ws库不支持命名空间(socket.io中广泛使用的通信通道)。 然而,为了模仿此功能,您可以在不同路径上 挂载多个ws服务器(例如:@WebSocketGateway({ path: '/users' }))。

要使用ws,首先必须安装所需的包:

npm i --save @nestjs/platform-ws

安装包后,我们可以切换适配器:

const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));
tip

WsAdapter@nestjs/platform-ws导入。

高级(自定义适配器)

出于演示目的,我们将手动集成ws库。 正如前面提到的,该库的适配器已经创建并在@nestjs/platform-ws包中以WsAdapter类的形式公开。 以下是简化实现的潜在外观:

ws-adapter.ts
import * as WebSocket from 'ws';
import { WebSocketAdapter, INestApplicationContext } from '@nestjs/common';
import { MessageMappingProperties } from '@nestjs/websockets';
import { Observable, fromEvent, EMPTY } from 'rxjs';
import { mergeMap, filter } from 'rxjs/operators';

export class WsAdapter implements WebSocketAdapter {
constructor(private app: INestApplicationContext) {}

create(port: number, options: any = {}): any {
return new WebSocket.Server({ port, ...options });
}

bindClientConnect(server, callback: Function) {
server.on('connection', callback);
}

bindMessageHandlers(
client: WebSocket,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>,
) {
fromEvent(client, 'message')
.pipe(
mergeMap(data => this.bindMessageHandler(data, handlers, process)),
filter(result => result),
)
.subscribe(response => client.send(JSON.stringify(response)));
}

bindMessageHandler(
buffer,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>,
): Observable<any> {
const message = JSON.parse(buffer.data);
const messageHandler = handlers.find(
handler => handler.message === message.event,
);
if (!messageHandler) {
return EMPTY;
}
return process(messageHandler.callback(message.data));
}

close(server) {
server.close();
}
}
note

当您想利用ws库时, 使用内置的WsAdapter而不是创建自己的适配器。

然后,我们可以使用useWebSocketAdapter()方法设置自定义适配器:

main.ts
const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));

示例

可在此处找到使用WsAdapter的工作示例。