除了通过查询(Query)获取数据和通过变更(Mutation)修改数据之外,GraphQL 规范还支持第三种操作类型,称为 订阅(subscription)。GraphQL 订阅是一种允许服务器主动向选择监听实时消息的客户端推送数据的机制。订阅与查询类似,都需要指定要返回给客户端的字段集合,但不同之处在于,订阅会打开一个通道,每当服务器端发生特定事件时,就会向客户端发送结果,而不是立即返回单一结果。
订阅的常见用例是通知客户端某些特定事件的发生,例如新对象的创建、字段的更新等(详细说明可参考这里)。
要启用订阅功能,需要将 installSubscriptionHandlers 属性设置为 true。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
installSubscriptionHandlers: true,
}),如需切换为 graphql-ws 包,请使用如下配置:
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': true
},
}),你也可以同时使用这两个包(subscriptions-transport-ws 和
graphql-ws),例如用于兼容旧版本。
要使用代码优先方式创建订阅,我们需要使用 @Subscription() 装饰器(由 @nestjs/graphql 包导出)以及 graphql-subscriptions 包中的 PubSub 类。PubSub 提供了一个简单的发布/订阅 API(publish/subscribe API)。
下面的订阅处理器通过调用 PubSub#asyncIterableIterator 方法来订阅某个事件。该方法接收一个参数 triggerName,即事件主题名称。
const pubSub = new PubSub()
@Resolver(() => Author)
export class AuthorResolver {
// ...
@Subscription(() => Comment)
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded')
}
}所有装饰器均由 @nestjs/graphql 包导出,而 PubSub 类由
graphql-subscriptions 包导出。
上述代码会在 GraphQL 模式定义语言(SDL)中生成如下部分:
type Subscription {
commentAdded(): Comment!
}需要注意的是,订阅本质上会返回一个对象,该对象仅包含一个顶层属性,其键名为订阅的名称。该名称要么继承自订阅处理器方法的名称(如上例中的 commentAdded),要么可以通过将带有 name 键的选项作为 @Subscription() 装饰器的第二个参数显式指定,如下所示:
@Subscription(() => Comment, {
name: 'commentAdded',
})
subscribeToCommentAdded() {
return pubSub.asyncIterableIterator('commentAdded')
}这种写法生成的 SDL 与前面的代码示例相同,但允许我们将方法名与订阅名称解耦。
现在,要发布事件,我们可以使用 PubSub#publish 方法。这个方法通常在变更操作中使用,用于在对象图的某一部分发生变化时,触发客户端的更新。例如:
@Mutation(() => Comment)
async addComment(
@Args('postId', { type: () => Int }) postId: number,
@Args('comment', { type: () => Comment }) comment: CommentInput,
) {
const newComment = this.commentsService.addComment({ id: postId, comment })
pubSub.publish('commentAdded', { commentAdded: newComment })
return newComment
}PubSub#publish 方法接收两个参数:第一个参数是 triggerName(可以理解为事件主题名称),第二个参数是事件负载(payload)。如前所述,订阅本质上会返回一个值,并且该值有其结构。请再次查看我们为 commentAdded 订阅生成的 SDL:
type Subscription {
commentAdded(): Comment!
}这告诉我们,订阅必须返回一个顶层属性名为 commentAdded 的对象,并且该属性的值是一个 Comment 对象。需要注意的重要一点是,PubSub#publish 方法发布的事件负载的结构,必须与订阅期望返回的值的结构一致。因此,在上面的示例中,pubSub.publish('commentAdded', { commentAdded: newComment }) 语句发布了一个 commentAdded 事件,并携带了结构正确的负载。如果这两者的结构不匹配,订阅将在 GraphQL 验证阶段失败。
要过滤特定事件,可以为 filter 属性指定一个过滤函数。该函数的作用类似于数组的 filter 方法。它接收两个参数:payload(事件负载,由事件发布者发送)和 variables(在订阅请求时传入的参数)。该函数返回一个布尔值,用于决定该事件是否应该发布给客户端监听者。
@Subscription(() => Comment, {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Args('title') title: string) {
return pubSub.asyncIterableIterator('commentAdded')
}如果需要修改发布的事件负载,可以为 resolve 属性指定一个函数。该函数接收事件负载(由事件发布者发送),并返回最终要传递给客户端的值。
@Subscription(() => Comment, {
resolve: value => value,
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded')
}如果你使用了 resolve 选项,应该返回未包裹的负载(例如,在本例中,直接返回 newComment 对象,而不是 {{ '{' }} commentAdded: newComment {{ '}' }} 这样的对象)。
如果你需要访问注入的提供者(例如,使用外部服务来校验数据),可以使用如下写法:
@Subscription(() => Comment, {
resolve(this: AuthorResolver, value) {
// "this" 指向 "AuthorResolver" 的实例
return value
}
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded')
}同样的写法也适用于过滤函数:
@Subscription(() => Comment, {
filter(this: AuthorResolver, payload, variables) {
// "this" 指向 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title
}
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded')
}要在 Nest 中创建等效的订阅,我们需要使用 @Subscription() 装饰器。
const pubSub = new PubSub()
@Resolver('Author')
export class AuthorResolver {
// ...
@Subscription()
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded')
}
}如果需要根据上下文和参数过滤特定事件,可以设置 filter 属性。
@Subscription('commentAdded', {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded')
}如果需要对已发布的 payload 进行变换,可以使用 resolve 函数。
@Subscription('commentAdded', {
resolve: value => value,
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded')
}如果你需要访问注入的提供者(例如,使用外部服务对数据进行校验),可以使用如下结构:
@Subscription('commentAdded', {
resolve(this: AuthorResolver, value) {
// "this" 指向 "AuthorResolver" 的实例
return value
}
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded')
}同样的结构也适用于 filter:
@Subscription('commentAdded', {
filter(this: AuthorResolver, payload, variables) {
// "this" 指向 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title
}
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded')
}最后一步是更新类型定义文件。
type Author {
id: Int!
firstName: String
lastName: String
posts: [Post]
}
type Post {
id: Int!
title: String
votes: Int
}
type Query {
author(id: Int!): Author
}
type Comment {
id: String
content: String
}
type Subscription {
commentAdded(title: String!): Comment
}至此,我们已经创建了一个 commentAdded(title: String!): Comment 订阅。你可以在这里查看完整的示例实现。
在上文中,我们实例化了一个本地的 PubSub 实例。更推荐的做法是将 PubSub 定义为一个提供者,并通过构造函数注入(使用 @Inject() 装饰器)。这样可以在整个应用中复用该实例。例如,可以如下定义一个提供者,然后在需要的地方注入 'PUB_SUB'。
{
provide: 'PUB_SUB',
useValue: new PubSub(),
}如果需要自定义订阅服务器(例如更改路径),可以使用 subscriptions 选项属性。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'subscriptions-transport-ws': {
path: '/graphql'
},
}
}),如果你在订阅中使用的是 graphql-ws 包,请将 subscriptions-transport-ws 键替换为 graphql-ws,如下所示:
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': {
path: '/graphql'
},
}
}),你可以在 subscriptions 选项中指定的 onConnect 回调函数内,检查用户是否已通过身份验证。
onConnect 的第一个参数是传递给 SubscriptionClient 的 connectionParams(详细内容请参阅这里)。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'subscriptions-transport-ws': {
onConnect: (connectionParams) => {
const authToken = connectionParams.authToken
if (!isValid(authToken)) {
throw new Error('Token is not valid')
}
// 从令牌中提取用户信息
const user = parseToken(authToken)
// 返回用户信息,稍后会添加到上下文中
return { user }
},
}
},
context: ({ connection }) => {
// connection.context 的值等于 "onConnect" 回调返回的内容
},
}),在本例中,authToken 只会在客户端首次建立连接时发送一次。
所有使用该连接发起的订阅都会使用相同的 authToken,因此用户信息也相同。
subscriptions-transport-ws 存在一个漏洞,允许连接跳过 onConnect
阶段(详细内容请参阅这里)。你不能假设用户在开始订阅时一定调用了
onConnect,因此应始终检查 context 是否已被正确填充。
如果你使用的是 graphql-ws 包,onConnect 回调的签名会略有不同:
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': {
onConnect: (context: Context<any>) => {
const { connectionParams, extra } = context
// 用户验证方式与上例相同
// 使用 graphql-ws 时,额外的上下文值应存储在 extra 字段中
extra.user = { user: {} }
},
},
},
context: ({ extra }) => {
// 你现在可以通过 extra 字段访问自定义的上下文值
},
})要启用订阅功能,只需将 subscription 属性设置为 true。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: true,
}),你也可以传递 options
对象,以自定义事件发射器、验证连接等。详细内容请参阅这里(见
subscription 相关内容)。
要使用代码优先方式创建订阅,我们需要用到 @Subscription() 装饰器(由 @nestjs/graphql 包导出)以及 mercurius 包中的 PubSub 类。PubSub 提供了简单的发布/订阅 API。
下面的订阅处理器通过调用 PubSub#asyncIterableIterator 方法来实现对事件的订阅。该方法接收一个参数 triggerName,即事件主题名称。
@Resolver(() => Author)
export class AuthorResolver {
// ...
@Subscription(() => Comment)
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded')
}
}上述示例中用到的所有装饰器均由 @nestjs/graphql 包导出,而 PubSub 类则由
mercurius 包导出。
PubSub 是一个类,提供了简单的 publish 和 subscribe
API。关于如何注册自定义 PubSub
类,请参考本节内容。
这样会在 GraphQL 的 SDL(Schema Definition Language,模式定义语言)中生成如下部分:
type Subscription {
commentAdded(): Comment!
}需要注意的是,订阅本质上会返回一个对象,该对象只有一个顶层属性,其键名就是订阅的名称。这个名称要么继承自订阅处理器方法的名称(如上例中的 commentAdded),要么可以通过将带有 name 键的选项作为 @Subscription() 装饰器的第二个参数显式指定,如下所示:
@Subscription(() => Comment, {
name: 'commentAdded',
})
subscribeToCommentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded')
}这种写法生成的 SDL 与前面的代码示例相同,但允许我们将方法名与订阅名称解耦。
现在,要发布事件,我们可以使用 PubSub#publish 方法。通常会在变更(Mutation)中使用它,以便在对象图的某部分发生变化时触发客户端更新。例如:
@Mutation(() => Comment)
async addComment(
@Args('postId', { type: () => Int }) postId: number,
@Args('comment', { type: () => Comment }) comment: CommentInput,
@Context('pubsub') pubSub: PubSub,
) {
const newComment = this.commentsService.addComment({ id: postId, comment })
await pubSub.publish({
topic: 'commentAdded',
payload: {
commentAdded: newComment
}
})
return newComment
}如前所述,订阅本质上会返回一个值,并且该值具有特定的结构。请再次查看我们 commentAdded 订阅生成的 SDL:
type Subscription {
commentAdded(): Comment!
}这表明订阅必须返回一个对象,该对象有一个顶层属性名为 commentAdded,其值为一个 Comment 对象。需要特别注意的是,PubSub#publish 方法发布的事件负载(payload)结构必须与订阅期望返回的值结构一致。因此,在上面的示例中,pubSub.publish({ topic: 'commentAdded', payload: { commentAdded: newComment } }) 语句发布了一个 commentAdded 事件,并携带了结构正确的负载。如果这两者的结构不匹配,订阅将在 GraphQL 校验阶段失败。
要过滤特定事件,可以设置 filter 属性为一个过滤函数。该函数的作用类似于数组的 filter 方法。它接收两个参数:payload(事件发布者发送的事件负载)和 variables(订阅请求时传入的参数)。该函数返回一个布尔值,用于决定该事件是否应该发布给客户端监听者。
@Subscription(() => Comment, {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded')
}如果你需要访问注入的提供者(例如,使用外部服务来校验数据),可以使用如下结构:
@Subscription(() => Comment, {
filter(this: AuthorResolver, payload, variables) {
// "this" 指向 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title
}
})
commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded')
}要在 Nest 中创建等效的订阅,我们需要使用 @Subscription() 装饰器。
const pubSub = new PubSub()
@Resolver('Author')
export class AuthorResolver {
// ...
@Subscription()
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded')
}
}如果你需要根据上下文和参数过滤特定事件,可以设置 filter 属性。
@Subscription('commentAdded', {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded')
}如果你需要访问注入的提供者(例如,使用外部服务对数据进行校验),可以使用如下写法:
@Subscription('commentAdded', {
filter(this: AuthorResolver, payload, variables) {
// "this" 指向 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title
}
})
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded')
}最后一步是更新类型定义文件。
type Author {
id: Int!
firstName: String
lastName: String
posts: [Post]
}
type Post {
id: Int!
title: String
votes: Int
}
type Query {
author(id: Int!): Author
}
type Comment {
id: String
content: String
}
type Subscription {
commentAdded(title: String!): Comment
}至此,我们就创建了一个 commentAdded(title: String!): Comment 订阅。
在上面的示例中,我们使用了默认的 PubSub 事件发射器(mqemitter)。
在生产环境中,推荐的做法是使用 mqemitter-redis。当然,你也可以提供自定义的 PubSub 实现(详细说明请参见这里)。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: {
emitter: require('mqemitter-redis')({
port: 6579,
host: '127.0.0.1',
}),
},
})你可以在 subscription 选项中指定的 verifyClient 回调函数内,检查用户是否已通过身份验证。
verifyClient 会接收一个 info 对象作为第一个参数,你可以通过它获取请求头(headers)。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: {
verifyClient: (info, next) => {
const authorization = info.req.headers?.authorization as string
if (!authorization?.startsWith('Bearer ')) {
return next(false)
}
next(true)
},
}
}),