Skip to content

feat: custom pubsub with custom args on subscribe method #1148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions docs/subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,36 @@ app.register(mercurius, {
### Subscriptions with custom PubSub
> Note that when passing both `pubsub` and `emitter` options, `emitter` will be ignored.

The CustomPubSub interface allows you to implement a custom publish-subscribe mechanism for GraphQL subscriptions. This gives you complete control over how subscription data is published and delivered.

#### CustomPubSub Interface

To create a custom PubSub implementation, you need to implement a class with at least the following methods:

- **subscribe(topic, queue, ...customArgs)**:
- `topic`: String identifier for the subscription topic
- `queue`: A Readable stream where subscription data will be pushed
- `...customArgs`: Optional additional parameters that can be passed from the subscription resolver
- Returns: A Promise that resolves when the subscription is set up

- **publish(event, callback)**:
- `event`: Object containing `topic` and `payload` properties
- `callback`: Function to be called when the publish operation completes
- Expected behavior: Emit the payload to subscribers of the topic

#### Custom Arguments in subscribe method

The `subscribe` method supports passing custom arguments from your GraphQL resolvers. This allows for additional filtering or configuration at the subscription level. For example, you might pass an `offset` parameter to control where a subscription starts reading from, or pass filter criteria to perform server-side filtering.

Here's an example implementation using Node's EventEmitter:

```js
class CustomPubSub {
constructor () {
this.emitter = new EventEmitter()
}

async subscribe (topic, queue) {
async subscribe (topic, queue, offset) {
const listener = (value) => {
queue.push(value)
}
Expand All @@ -314,8 +337,7 @@ class CustomPubSub {
}

this.emitter.on(topic, listener)
if (!queue.close) queue.close = []
queue.close.push(close)
queue.close.push(close)
}

publish (event, callback) {
Expand All @@ -328,7 +350,13 @@ const pubsub = new CustomPubSub()

app.register(mercurius, {
schema,
resolvers,
resolvers: {
Subscription: {
retrieveItems: {
subscribe: (root, args, { pubsub }) => pubsub.subscribe('RETRIEVE_ITEMS', args.offset)
}
}
},
subscription: {
pubsub
}
Expand Down
14 changes: 14 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import type { WebSocket } from 'ws'
import { IncomingMessage, OutgoingHttpHeaders } from 'http'
import { Readable } from 'stream'
import { EventEmitter } from 'events'

type Mercurius = typeof mercurius

Expand All @@ -40,6 +41,19 @@ declare namespace mercurius {
): void;
}

export interface CustomPubSub {
emitter: EventEmitter;
subscribe(
topic: string | string[],
queue: Readable & { close: () => void },
...customArgs: any[]
): Promise<void>;
publish(
event: { topic: string, payload: any },
callback: () => void
): void;
}

export interface MercuriusContext {
app: FastifyInstance;
reply: FastifyReply;
Expand Down
9 changes: 6 additions & 3 deletions lib/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@ class SubscriptionContext {
})
}

subscribe (topics) {
// `topics` param can be:
// - string: subscribe to a single topic
// - array: subscribe to multiple topics
subscribe (topics, ...customArgs) {
if (typeof topics === 'string') {
return this.pubsub.subscribe(topics, this.queue).then(() => this.queue)
return this.pubsub.subscribe(topics, this.queue, ...customArgs).then(() => this.queue)
}
return Promise.all(topics.map((topic) => this.pubsub.subscribe(topic, this.queue))).then(() => this.queue)
return Promise.all(topics.map((topic) => this.pubsub.subscribe(topic, this.queue, ...customArgs))).then(() => this.queue)
}

publish (event) {
Expand Down
235 changes: 216 additions & 19 deletions test/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -296,31 +296,31 @@ test('subscription server sends update to subscriptions', t => {
})
})

class CustomPubSub {
constructor () {
this.emitter = new EventEmitter()
}

async subscribe (topic, queue) {
const listener = (value) => {
queue.push(value)
test('subscription with custom pubsub', t => {
class CustomPubSub {
constructor () {
this.emitter = new EventEmitter()
}

const close = () => {
this.emitter.removeListener(topic, listener)
}
async subscribe (topic, queue) {
const listener = (value) => {
queue.push(value)
}

this.emitter.on(topic, listener)
queue.close = close
}
const close = () => {
this.emitter.removeListener(topic, listener)
}

this.emitter.on(topic, listener)
queue.close = close
}

publish (event, callback) {
this.emitter.emit(event.topic, event.payload)
callback()
publish (event, callback) {
this.emitter.emit(event.topic, event.payload)
callback()
}
}
}

test('subscription with custom pubsub', t => {
const app = Fastify()
t.teardown(() => app.close())

Expand Down Expand Up @@ -491,6 +491,203 @@ test('subscription with custom pubsub', t => {
})
})

test('subscription with custom pubsub with custom params on subscribe method', t => {
class CustomPubSub {
constructor () {
this.emitter = new EventEmitter()
}

async subscribe (topic, queue, subscriptionName, filter) {
const listener = (value) => {
if (value[subscriptionName].message.includes(filter)) {
queue.push(value)
}
}

const close = () => {
this.emitter.removeListener(topic, listener)
}

this.emitter.on(topic, listener)
queue.close = close
}

publish (event, callback) {
this.emitter.emit(event.topic, event.payload)
callback()
}
}

const app = Fastify()
t.teardown(() => app.close())

const pubsub = new CustomPubSub()

const sendTestQuery = () => {
app.inject({
method: 'POST',
url: '/graphql',
body: {
query: `
query {
notifications {
id
message
}
}
`
}
}, () => {
sendTestMutation()
})
}

const sendTestMutation = () => {
app.inject({
method: 'POST',
url: '/graphql',
body: {
query: `
mutation {
addNotification(message: "Hello World") {
id
}
}
`
}
}, () => {})
}

const schema = `
type Notification {
id: ID!
message: String
}

type Query {
notifications: [Notification]
}

type Mutation {
addNotification(message: String): Notification
}

type Subscription {
onNotificationAdded(filter: String!): Notification
}
`

let idCount = 1
const notifications = [{
id: idCount,
message: 'Notification message'
}]

const resolvers = {
Query: {
notifications: () => notifications
},
Mutation: {
addNotification: async (_, { message }) => {
const id = idCount++
const notification = {
id,
message
}
notifications.push(notification)
await pubsub.emitter.emit('NOTIFICATION_ADDED', { onNotificationAdded: notification })

return notification
}
},
Subscription: {
onNotificationAdded: {
subscribe: (root, args, { pubsub }) => pubsub.subscribe('NOTIFICATION_ADDED', 'onNotificationAdded', args.filter)
}
}
}

app.register(GQL, {
schema,
resolvers,
subscription: {
pubsub
}
})

app.listen({ port: 0 }, err => {
t.error(err)

const ws = new WebSocket('ws://localhost:' + (app.server.address()).port + '/graphql', 'graphql-ws')
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true })
t.teardown(client.destroy.bind(client))
client.setEncoding('utf8')

client.write(JSON.stringify({
type: 'connection_init'
}))

client.write(JSON.stringify({
id: 1,
type: 'start',
payload: {
query: `
subscription {
onNotificationAdded(filter: "Hello") {
id
message
}
}
`
}
}))

client.write(JSON.stringify({
id: 2,
type: 'start',
payload: {
query: `
subscription {
onNotificationAdded(filter: "Hello") {
id
message
}
}
`
}
}))

client.write(JSON.stringify({
id: 2,
type: 'stop'
}))

client.on('data', chunk => {
const data = JSON.parse(chunk)

if (data.id === 1 && data.type === 'data') {
t.equal(chunk, JSON.stringify({
type: 'data',
id: 1,
payload: {
data: {
onNotificationAdded: {
id: '1',
message: 'Hello World'
}
}
}
}))

client.end()
t.end()
} else if (data.id === 2 && data.type === 'complete') {
sendTestQuery()
}
})
})
})

test('subscription server sends update to subscriptions with custom context', t => {
const app = Fastify()
t.teardown(() => app.close())
Expand Down
Loading
Loading