Skip to main content

订阅(Subscriptions)

除了使用查询获取数据和使用变更修改数据之外,GraphQL 规范还支持第三种操作类型,称为订阅(subscription)。 GraphQL 订阅是一种从服务器推送数据到选择监听服务器实时消息的客户端的方式。 订阅类似于查询,因为它们指定要传递给客户端的一组字段,但与立即返回单个响应不同, 它会打开一个通道,并在服务器上发生特定事件时将结果发送到客户端。

订阅的一个常见用例是通知客户端特定事件, 例如创建新对象、更新字段等(在此处阅读更多)。

使用 Apollo driver 启用订阅

要启用订阅,请将 installSubscriptionHandlers 属性设置为 true

GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
installSubscriptionHandlers: true,
}),
warning

installSubscriptionHandlers 配置选项已从最新版本的 Apollo server 中删除,并将很快在此包中废弃。 默认情况下,installSubscriptionHandlers 将回退到使用 subscriptions-transport-ws(在此处阅读更多), 但我们强烈建议改用 graphql-ws(在此处阅读更多)库。

要切换到使用 graphql-ws 包,请使用以下配置:

GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': true
},
}),
note

您还可以同时使用这两个包(subscriptions-transport-wsgraphql-ws),例如,为了向后兼容性。

代码优先

使用代码优先方法创建订阅时,我们使用 @Subscription() 装饰器(从 @nestjs/graphql 包中导出)和 graphql-subscriptions 包中提供的 PubSub 类,该类提供了一个简单的发布/订阅 API

以下订阅处理程序通过调用 PubSub#asyncIterator订阅事件。 此方法接受一个参数,即 triggerName,对应于事件主题名称。

const pubSub = new PubSub();

@Resolver((of) => Author)
export class AuthorResolver {
// ...
@Subscription((returns) => Comment)
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
}
tip

所有装饰器都是从 @nestjs/graphql 包中导出的, 而 PubSub 类是从 graphql-subscriptions 包中导出的。

note

PubSub 是一个公开简单发布和订阅 API 的类。在此处阅读更多。 请注意,Apollo 文档警告称默认实现不适用于生产环境(在此处阅读更多)。 生产应用程序应使用由外部存储支持的 PubSub 实现(在此处阅读更多)。

这将导致在 GraphQL 架构中生成以下部分:

type Subscription {
commentAdded(): Comment!
}

请注意,订阅根据定义返回一个具有单个顶级属性的对象,该属性的键是订阅的name。 此名称要么继承自订阅处理程序方法的名称(即上面的 commentAdded), 要么通过在 @Subscription() 装饰器的第二个参数中传递带有键名的选项来显式提供,如下所示。

@Subscription(returns => Comment, {
name: 'commentAdded',
})
subscribeToCommentAdded() {
return pubSub.asyncIterator('commentAdded');
}

此结构产生与上述代码示例相同的 SDL,但允许我们将方法名称与订阅解耦

发布

现在,要发布事件,我们使用 PubSub#publish 方法。 这通常在变更内部使用,以在对象图的一部分发生更改时触发客户端更新。例如:

posts/posts.resolver.ts
@Mutation(returns => Post)
async addComment(
@Args('postId', { type: () => Int }) postId: number,
@Args('comment', { type: () => Comment }) comment: CommentInput,
) {
const newComment = this.commentsService.addComment({ id: postId, comment });
pubSub.publish('commentAdded', { commentAdded: newComment });
return newComment;
}

PubSub#publish 方法接受 triggerName(再次,将其视为事件主题名称) 作为第一个参数,并将事件载荷作为第二个参数。 如上所述,订阅根据定义返回一个值,并且该值具有形状。再次查看我们的 commentAdded 订阅生成的 SDL:

type Subscription {
commentAdded(): Comment!
}

这告诉我们,订阅必须返回具有名为 commentAdded 的顶级属性的对象,该属性的值是 Comment 对象。 重要的一点是,由 PubSub#publish 方法发出的事件载荷的形状必须与订阅期望返回的值的形状相对应。 因此,在我们的上面的示例中,pubSub.publish('commentAdded', { commentAdded: newComment }) 语句发布具有适当形状的 commentAdded 事件载荷。 如果这些形状不匹配,您的订阅将在 GraphQL 验证阶段失败。

过滤订阅

要过滤特定事件,请将 filter 属性设置为过滤函数。 此函数的作用类似于传递给数组 filter 的函数。 它接受两个参数:包含事件载荷的 payload(由事件发布者发送), 以及包含在订阅请求期间传递的任何参数的 variables。 它返回一个布尔值,用于确定是否应将此事件发布给客户端侦听器。

@Subscription(returns => Comment, {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Args('title') title: string) {
return pubSub.asyncIterator('commentAdded');
}

突变订阅负载

要突变发布的事件载荷,请将 resolve 属性设置为一个函数。 该函数接收事件载荷(由事件发布者发送)并返回适当的值。

@Subscription(returns => Comment, {
resolve: value => value,
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
note

如果使用 resolve 选项,应返回未包装的载荷 (例如,使用我们的示例,直接返回一个 newComment 对象,而不是一个 { commentAdded: newComment } 对象)。

如果需要访问注入的提供程序(例如,使用外部服务验证数据),请使用以下结构。

@Subscription(returns => Comment, {
resolve(this: AuthorResolver, value) {
// "this" 指的是 "AuthorResolver" 的实例
return value;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}

与过滤器一样,相同的构造方式适用于解析:

@Subscription(returns => Comment, {
filter(this: AuthorResolver, payload, variables) {
// "this" 指的是 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}

模式优先

要在 Nest 中创建等效的订阅,我们将使用 @Subscription() 装饰器。

const pubSub = new PubSub();

@Resolver('Author')
export class AuthorResolver {
// ...
@Subscription()
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
}

要根据上下文和参数筛选特定事件,请设置 filter 属性。

@Subscription('commentAdded', {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}

要变异已发布的有效负载,我们可以使用解析(resolve)函数。

@Subscription('commentAdded', {
resolve: value => value,
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}

如果需要访问注入的提供程序(例如,使用外部服务验证数据),请使用以下结构。

@Subscription('commentAdded', {
resolve(this: AuthorResolver, value) {
// "this" 指的是 "AuthorResolver" 的实例
return value;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}

与过滤器一样,相同的构造方式适用于解析:

@Subscription('commentAdded', {
filter(this: AuthorResolver, payload, variables) {
// "this" 指的是 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}

最后一步是更新类型定义文件。

type Author {
id: Int!
firstName: String
lastName: String
posts: [Post]
}

type Post {
id: Int!
title: String
votes: Int
}

type Query {
author(id: Int!): Author
}

type Comment {
id: String
content: String
}

type Subscription {
commentAdded(title: String!): Comment
}

通过这样做,我们创建了一个单一的 commentAdded(title: String!): Comment 订阅。 您可以在此处找到完整的示例实现。

PubSub

我们在上面实例化了一个本地 PubSub 实例。 首选的方法是将 PubSub 定义为提供者程序, 并通过构造函数(使用 @Inject() 装饰器)注入它。 这样我们就可以在整个应用程序中重复使用该实例。例如,定义一个提供程序如下,然后在需要的地方注入 'PUB_SUB'

{
provide: 'PUB_SUB',
useValue: new PubSub(),
}

自定义订阅服务器

要自定义订阅服务器(例如,更改路径),请使用 subscriptions 选项属性。

GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'subscriptions-transport-ws': {
path: '/graphql'
},
}
}),

如果您正在使用 graphql-ws 包进行订阅,请将 subscriptions-transport-ws 键替换为 graphql-ws,如下所示:

GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': {
path: '/graphql'
},
}
}),

通过 WebSockets 进行身份验证

检查用户是否经过身份验证可以在 onConnect 回调函数中完成,您可以在subscriptions选项中指定该回调函数。

onConnect 将接收作为第一个参数传递给 SubscriptionClientconnectionParams(阅读更多)。

GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'subscriptions-transport-ws': {
onConnect: (connectionParams) => {
const authToken = connectionParams.authToken;
if (!isValid(authToken)) {
throw new Error('Token is not valid');
}
// 从令牌中提取用户信息
const user = parseToken(authToken);
// 将用户信息返回,以便稍后将其添加到上下文中
return { user };
},
}
},
context: ({ connection }) => {
// connection.context 将等于 "onConnect" 回调返回的内容
},
}),

在此示例中,authToken 仅在客户端首次建立连接时由客户端发送。 使用此连接进行的所有订阅都将具有相同的 authToken,因此也具有相同的用户信息。

note

subscriptions-transport-ws 中存在一个允许连接跳过 onConnect 阶段的错误(阅读更多)。 您不应假设用户在用户启动订阅时调用了 onConnect,并始终检查context是否被填充。

如果您使用 graphql-ws 包,则 onConnect 回调的签名将略有不同:

GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': {
onConnect: (context: Context<any>) => {
const { connectionParams, extra } = context;
// 用户验证将保持与上面示例中相同
// 使用 graphql-ws 时,应在 extra 字段中存储额外的上下文值
extra.user = { user: {} };
},
},
},
context: ({ extra }) => {
// 现在,您可以通过 extra 字段访问额外的上下文值
},
});

使用 Mercurius 驱动程序启用订阅

要启用订阅,请将 subscription 属性设置为 true

GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: true,
}),
note

您还可以传递选项对象来设置自定义发射器、验证传入连接等。 在此处了解更多信息(请参阅subscription)。

代码优先方式

要使用代码优先方法创建订阅,我们使用 @Subscription() 装饰器(从 @nestjs/graphql 包中导出)和来自 mercurius 包的 PubSub 类, 该类提供了一个简单的发布/订阅 API

以下订阅处理程序负责通过调用 PubSub#asyncIterator 订阅事件。 此方法接受一个参数,triggerName,对应于事件主题名称。

@Resolver((of) => Author)
export class AuthorResolver {
// ...
@Subscription((returns) => Comment)
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
}
tip

上面示例中使用的所有装饰器都是从 @nestjs/graphql 包导出的, 而 PubSub 类是从 mercurius 包导出的。

note

PubSub 是一个公开简单publishsubscribe API 的类。 查看有关如何注册自定义 PubSub 类的此部分

这将导致在 SDL 中生成以下 GraphQL 模式的部分:

type Subscription {
commentAdded(): Comment!
}

请注意,订阅根据定义返回一个对象,该对象具有单个顶级属性,其键是订阅的名称。 此名称可以从订阅处理程序方法的名称继承(即上面的 commentAdded), 或者通过将键name作为第二个参数传递给 @Subscription() 装饰器来显式提供,如下所示。

@Subscription(returns => Comment, {
name: 'commentAdded',
})
subscribeToCommentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}

这个结构产生了与前面代码示例相同的 SDL,但允许我们将方法名称与订阅解耦。

发布

现在,要发布事件,我们使用 PubSub#publish 方法。 通常,这在变异中使用,以触发客户端更新当对象图的一部分发生更改时。例如:

posts/posts.resolve.ts
@Mutation(returns => Post)
async addComment(
@Args('postId', { type: () => Int }) postId: number,
@Args('comment', { type: () => Comment }) comment: CommentInput,
@Context('pubsub') pubSub: PubSub,
) {
const newComment = this.commentsService.addComment({ id: postId, comment });
await pubSub.publish({
topic: 'commentAdded',
payload: {
commentAdded: newComment
}
});
return newComment;
}

如上所述,订阅根据定义返回一个值,并且该值具有形状。再次查看我们的 commentAdded 订阅生成的 SDL:

type Subscription {
commentAdded(): Comment!
}

这告诉我们,订阅必须返回一个具有名为 commentAdded 的顶级属性的对象,该属性具有 Comment 对象的值。 要注意的重要一点是,由 PubSub#publish 方法发出的事件有效负载的形状必须对应于从订阅中返回的值的形状。 因此,在我们上面的示例中,pubSub.publish({ topic: 'commentAdded', payload: { commentAdded: newComment } }) 语句发布了一个具有相应形状的 commentAdded 事件有效负载。 如果这些形状不匹配,您的订阅将在 GraphQL 验证阶段失败。

过滤订阅

要过滤特定事件,请将 filter 属性设置为筛选函数。此函数类似于传递给数组过滤器的函数。 它接受两个参数:包含事件有效负载的 payload(由事件发布者发送), 以及在订阅请求期间传递的任何参数的 variables。 它返回一个布尔值,确定是否应将此事件发布给客户端侦听器。

@Subscription(returns => Comment, {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}

如果需要访问已注入的提供程序(例如,使用外部服务验证数据),请使用以下结构。

@Subscription(returns => Comment, {
filter(this: AuthorResolver, payload, variables) {
// "this" 指的是 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title;
}
})
commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}

模式优先方式

要在 Nest 中创建等效的订阅,我们将使用 @Subscription() 装饰器。

const pubSub = new PubSub();

@Resolver('Author')
export class AuthorResolver {
// ...
@Subscription()
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
}

要根据上下文和参数筛选特定事件,请设置 filter 属性。

@Subscription('commentAdded', {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}

如果需要访问已注入的提供程序(例如,使用外部服务验证数据),请使用以下结构。

@Subscription('commentAdded', {
filter(this: AuthorResolver, payload, variables) {
// "this" 指的是 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title;
}
})
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}

最后一步是更新类型定义文件。

type Author {
id: Int!
firstName: String
lastName: String
posts: [Post]
}

type Post {
id: Int!
title: String
votes: Int
}

type Query {
author(id: Int!): Author
}

type Comment {
id: String
content: String
}

type Subscription {
commentAdded(title: String!): Comment
}

通过这样做,我们创建了一个单一的 commentAdded(title: String!): Comment 订阅。

PubSub

在上面的示例中,我们使用了默认的 PubSub 发射器(mqemitter)。 首选的方法(用于生产环境)是使用 mqemitter-redis。 或者,可以提供自定义的 PubSub 实现(阅读更多内容)。

GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: {
emitter: require('mqemitter-redis')({
port: 6579,
host: '127.0.0.1',
}),
},
});

通过 WebSocket 进行身份验证

检查用户是否经过身份验证可以在subscription选项中指定的 verifyClient 回调函数内完成。

verifyClient 将接收 info 对象作为第一个参数,您可以使用该对象检索请求的标头。

GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: {
verifyClient: (info, next) => {
const authorization = info.req.headers?.authorization as string;
if (!authorization?.startsWith('Bearer ')) {
return next(false);
}
next(true);
},
}
});