WebSocket
网关
本文档中讨论的大多数概念,如依赖注入、装饰器、异常过滤器、管道、守卫和拦截器,同样适用于网关。 在可能的情况下,Nest抽象出实现细节,以便相同的组件可以在基于HTTP的平台、WebSockets和微服务上运行。 本节介绍了与WebSockets特定的Nest方面。
在Nest中,网关只是一个带有@WebSocketGateway()装饰器的类。
从技术上讲,网关是与平台无关的,这使它们与创建适配器后兼容任何WebSockets库。
默认情况下,支持两个WS平台:socket.io
和ws。
您可以选择最适合您需求的平台。此外,您还可以按照此指南构建自己的适配器。
可以将网关视为提供程序;这意味着它们可以通过类构造函数注入依赖项。 此外,其他类(提供程序和控制器)也可以注入网关。
安装
要开始构建基于WebSockets的应用程序,请首先安装所需的软件包:
npm i --save @nestjs/websockets @nestjs/platform-socket.io
概述
通常情况下,每个网关都监听与HTTP服务器相同的端口,除非您的应用程序不是Web应用程序,
或者您手动更改了端口。通过将参数传递给@WebSocketGateway(80)装饰器,
可以修改此默认行为,其中80是所选端口号。您还可以使用以下结构设置网关使用的
命名空间:
@WebSocketGateway(80, { namespace: 'events' })
网关在被引用于现有模块的提供程序数组中之前不会实例化。
可以通过将第二个参数传递给@WebSocketGateway()装饰器,
将任何支持的选项传递给socket构造函数,
如下所示:
@WebSocketGateway(81, { transports: ['websocket'] })
现在,网关正在监听,但我们尚未订阅任何传入的消息。
让我们创建一个处理程序,该处理程序将订阅events消息并用相同的数据回应用户。
@SubscribeMessage('events')
handleEvent(@MessageBody() data: string): string {
return data;
}
@SubscribeMessage()和@MessageBody()装饰器是从@nestjs/websockets软件包导入的。
创建网关后,我们可以在我们的模块中注册它。
@Module({
providers: [EventsGateway]
})
export class EventsModule {}
您还 可以通过将属性键传递给装饰器,从传入的消息体中提取它:
@SubscribeMessage('events')
handleEvent(@MessageBody('id') id: number): number {
// id === messageBody.id
return id;
}
如果不想使用装饰器,以下代码在功能上是等效的:
@SubscribeMessage('events')
handleEvent(client: Socket, data: string): string {
return data;
}
在上面的例子中,handleEvent()函数接受两个参数。
第一个是特定于平台的socket实例,
而第二个是从客户端接收的数据。
尽管这种方法不推荐,因为它需要在每个单元测试中模拟socket实例。
收到events消息后,处理程序将使用与网络发送的相同数据发送确认。
此外,还可以使用特定于库的方法发出消息,例如使用client.emit()方法。
为了访问已连接的socket实例,请使用@ConnectedSocket()装饰器。
@SubscribeMessage('events')
handleEvent(
@MessageBody() data: string,
@ConnectedSocket() client: Socket,
): string {
return data;
}
@ConnectedSocket()装饰器是从@nestjs/websockets软件包导入的。
但是,在这种情况下,您将无法利用拦截器。如果不想响应用户,
可以简单地跳过return语句(或显式返回"虚假"值,例如undefined)。
现在,当客户端发出以下消息时:
socket.emit('events', { name: 'Nest' });
handleEvent()方法将被执行。
为了监听从上述处理程序中发出的消息,客户端必须附加相应的确认监听器:
socket.emit('events', { name: 'Nest' }, (data) => console.log(data));
多重响应
确认仅分发一次。此外,它不受本机WebSockets实现的支持。 为了解决此限制,可以返回一个包含两个属性的对象。 事件是发出事件的名称,数据必须转发到客户端。
@SubscribeMessage('events')
handleEvent(@MessageBody() data: unknown): WsResponse<unknown> {
const event = 'events';
return { event, data };
}
WsResponse接口是从@nestjs/websockets软件包导入的。
如果您的数据字段依赖于ClassSerializerInterceptor,
则应返回实现WsResponse的类实例,因为它会忽略普通的JavaScript对象响应。
为了监听传入的响应,客户端必须应用另一个事件监听器。
socket.on('events', (data) => console.log(data));
异步响应
消息处理程序可以同步或异步地响应。因此,支持异步方法。
消息处理程序还可以返回Observable,在这种情况下,将在流完成之前发出结果值。
@SubscribeMessage('events')
onEvent(@MessageBody() data: unknown): Observable<WsResponse<number>> {
const event = 'events';
const response = [1, 2, 3];
return from(response).pipe(
map(data => ({ event, data })),
);
}
在上面的示例中,消息处理程序将3次响应(数组中的每个项目)。
生命周期挂钩
有3个有用的生命周期挂钩可用。它们都有相应的接口,并在以下表格中描述:
OnGatewayInit:强制实现afterInit()方法。以库特定的服务器实例作为参数(如果需要,还会传递其余实例)。OnGatewayConnection:强制实现handleConnection()方法。以库特定的客户端socket实例作为参数。OnGatewayDisconnect:强制实现handleDisconnect()方法。以库特定的客户端socket实例作为参数。
每个生命周期接口都是从@nestjs/websockets软件包中公开的。
服务器
有时,您可能希望直接访问本机、平台特定的服务器实例。
将对此对象的引用作为参数传递给afterInit()方法(OnGatewayInit接口)。
另一个选项是使用@WebSocketServer()装饰器。
@WebSocketServer()
server: Server;
@WebSocketServer()装饰器是从@nestjs/websockets软件包导入的。
Nest将在准备好使用后自动将服务器实例分配给此属性。
例子
此处提供了一个工作示例。
异常过滤器
HTTP异常过滤器层与相应的WebSockets层之间唯一的区别是,
您应该使用WsException而不是HttpException来抛出异常。
throw new WsException('Invalid credentials.');
WsException类是从@nestjs/websockets软件包导入的。
使用上述示例,Nest将处理抛出的异常并使用以下结构发出exception消息:
{
"status": "error",
"message": "Invalid credentials."
}
过滤器
WebSockets异常过滤器的行为等效于HTTP异常过滤器。
以下示例使用手动实例化的方法范围过滤器。
与基于HTTP的应用程序一样,
您还可以使用网关范围的过滤器(即,在网关类前加上@UseFilters()装饰器)。
@UseFilters(new WsExceptionFilter())
@SubscribeMessage('events')
onEvent(client, data: any): WsResponse<any> {
const event = 'events';
return { event, data };
}
继承
通常,您将创建完全定制的异常过滤器,以满足应用程序的要求。 但是,在某些情况下,您可能希望简单地扩展核心异常过滤器,并根据某些因素覆盖行为。
为了将异常处理委托给基本过滤器,您需要扩展BaseWsExceptionFilter并调用继承的catch()方法。
import { Catch, ArgumentsHost } from '@nestjs/common';
import { BaseWsExceptionFilter } from '@nestjs/websockets';
@Catch()
export class AllExceptionsFilter extends BaseWsExceptionFilter {
catch(exception: unknown, host: ArgumentsHost) {
super.catch(exception, host);
}
}
上述实现只是演示了这种方法。您扩展的异常过滤器的实现将包含您的定制业务逻辑(例如,处理各种条件)。
管道
常规管道和WebSockets管道之间没有根本区别。
唯一的区别是,您应该使用WsException而不是HttpException来抛出异常。
此外,所有管道将仅应用于data参数(因为验证或转换client实例是无用的)。
WsException类是从@nestjs/websockets软件包中公开的。
绑定管道
以下示例使用手动实例化的方法范围管道。
与基于HTTP的应用程序一样,
您还可以使用网关范围的管道(即,在网关类前加上@UsePipes()装饰器)。
@UsePipes(new ValidationPipe())
@SubscribeMessage('events')
handleEvent(client: Client, data: unknown): WsResponse<unknown> {
const event = 'events';
return { event, data };
}
守卫
WebSockets守卫和常规HTTP应用程序守卫之间没有根本区别。
唯一的区别是,您应该使用WsException而不是HttpException来抛出异常。
WsException类是从@nestjs/websockets软件包中公开的。
绑定守卫
以下示例使用方法范围守卫。
与基于HTTP的应用程序一样,
您还可以使用网关范围的守卫(即,在网关类前加上@UseGuards()装饰器)。
@UseGuards(AuthGuard)
@SubscribeMessage('events')
handleEvent(client: Client, data: unknown): WsResponse<unknown> {
const event = 'events';
return { event, data };
}
拦截器
常规拦截器和WebSockets拦截器之间没有区别。
以下示例使用手动实例化的方法范围拦截器。
与基于HTTP的应用程序一样,
您还可以使用网关范围的拦截器(即,在网关类前加上@UseInterceptors()装饰器)。
@UseInterceptors(new TransformInterceptor())
@SubscribeMessage('events')
handleEvent(client: Client, data: unknown): WsResponse<unknown> {
const event = 'events';
return { event, data };
}
适配器
WebSockets模块是平台无关的,因此,
您可以通过使用WebSocketAdapter接口引入自己的库(甚至是本地实现)。
该接口强制实现以下表中描述的几种方法:
create:基于传递的参数创建一个socket实例bindClientConnect:绑定客户端连接事件bindClientDisconnect:绑定客户端断开连接事件(可选)bindMessageHandlers:将传入的消息绑定到相应的消息处理程序close:终止服务器实例
扩展socket.io
socket.io包包装在IoAdapter类中。
如果您想增强适配器的基本功能,例如,技术要求需要在负载平衡的多个实例之间广播事件,
您可以扩展IoAdapter并覆盖负责实例化新socket.io服务器的单个方法。
首先,让我们安装所需的包。
要在多个负载平衡实例上使用socket.io,
您必须通过在客户端socket.io配置中设置transports: ['websocket']
或在负载均衡器中启用基于cookie的路由来禁用轮询。
仅使用Redis是不够的。有关更多信息,请参见此处。
npm i --save redis socket.io @socket.io/redis-adapter
安装包后,我们可以创建一个RedisIoAdapter类。
import { IoAdapter } from '@nestjs/platform-socket.io';
import { ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
export class RedisIoAdapter extends IoAdapter {
private adapterConstructor: ReturnType<typeof createAdapter>;
async connectToRedis(): Promise<void> {
const pubClient = createClient({ url: `redis://localhost:6379` });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
this.adapterConstructor = createAdapter(pubClient, subClient);
}
createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options);
server.adapter(this.adapterConstructor);
return server;
}
}
然后,简单地切换到新创建的Redis适配器。
const app = await NestFactory.create(AppModule);
const redisIoAdapter = new RedisIoAdapter(app);
await redisIoAdapter.connectToRedis();
app.useWebSocketAdapter(redisIoAdapter);
Ws库
另一个可用的适配器是WsAdapter,它充当框架与集成了高速且经过充分测试的ws库之间的代理。
该适配器完全兼容本机浏览器WebSockets,比socket.io包快得多。
不幸的是,它的可用功能明显较少。然而,在某些情况下,您可能不一定需要它们。
ws库不支持命名空间(socket.io中广泛使用的通信通道)。
然而,为了模仿此功能,您可以 在不同路径上
挂载多个ws服务器(例如:@WebSocketGateway({ path: '/users' }))。
要使用ws,首先必须安装所需的包:
npm i --save @nestjs/platform-ws
安装包后,我们可以切换适配器:
const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));
WsAdapter从@nestjs/platform-ws导入。
高级(自定义适配器)
出于演示目的,我们将手动集成ws库。
正如前面提到的,该库的适配器已经创建并在@nestjs/platform-ws包中以WsAdapter类的形式公开。
以下是简化实现的潜在外观:
import * as WebSocket from 'ws';
import { WebSocketAdapter, INestApplicationContext } from '@nestjs/common';
import { MessageMappingProperties } from '@nestjs/websockets';
import { Observable, fromEvent, EMPTY } from 'rxjs';
import { mergeMap, filter } from 'rxjs/operators';
export class WsAdapter implements WebSocketAdapter {
constructor(private app: INestApplicationContext) {}
create(port: number, options: any = {}): any {
return new WebSocket.Server({ port, ...options });
}
bindClientConnect(server, callback: Function) {
server.on('connection', callback);
}
bindMessageHandlers(
client: WebSocket,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>,
) {
fromEvent(client, 'message')
.pipe(
mergeMap(data => this.bindMessageHandler(data, handlers, process)),
filter(result => result),
)
.subscribe(response => client.send(JSON.stringify(response)));
}
bindMessageHandler(
buffer,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>,
): Observable<any> {
const message = JSON.parse(buffer.data);
const messageHandler = handlers.find(
handler => handler.message === message.event,
);
if (!messageHandler) {
return EMPTY;
}
return process(messageHandler.callback(message.data));
}
close(server) {
server.close();
}
}
当您想利用ws库时,
使用内置的WsAdapter而不是创建自己的适配器。
然后,我们可以使用useWebSocketAdapter()方法设置自定义适配器:
const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));
示例
可在此处找到使用WsAdapter的工作示例。