DEV Community

Cover image for On-Demand Shared GraphQL Subscriptions with RxJS
TheGuildBot for The Guild

Posted on • Edited on • Originally published at the-guild.dev

On-Demand Shared GraphQL Subscriptions with RxJS

This article was published on Friday, January 27, 2023 by Ghislain Thau @ The Guild Blog

GraphQL has native support for subscriptions as schema operations. GraphQL subscriptions allow to
receive continuous updates, usually implemented using a persistent WebSocket connection or
Server-Sent Events.

Ideally, we want subscriptions to be stateless and independent for every subscriber: when a
subscriber subscribes to an entity, this subscription is independent of any other subscription and
can be handled by any instance of our GraphQL server. This is the case when the entity subscribed to
is "simple" and processing updates is trivial: for example, we receive events from a pubsub
provider/message queue (e.g. Redis, Nats.io, etc.) in the form { type: update, entityId: xyz }. In
this case, our subscription handler can simply subscribe to these events, filter on the requested
entityId, and forward those events (maybe a bit processed) to the subscriber.

However, this ideal scenario is not always possible. We might have "complex" entities we want to
subscribe to, e.g. views that are aggregates of data over several data sources. The view might be
expensive to fetch and subscribing to the view's content updates might involve subscribing to
multiple events and performing expensive processing and/or additional fetching in order to compute
the updates and send them to the subscriber.

In this case, if multiple subscribers subscribe to the same view, the events' processing, additional
fetching, and computing would have to be done for each subscriber, which will lead to performance
issues as the number of subscribers increases.

In this article, we'll show how we can improve this use case by sharing the subscriptions' sources
and also perform their side-effect processing only when there are subscribers.


The solution uses RxJS, a library for Reactive Functional
Programming
, it is based on the
Observer pattern and provides a wide range of
functions
to deal with streams of events in a declarative
way.

Mutualizing the Subscription's Work

If the subscription is independent of the subscriber (e.g. the logged-in user) and only dependent
upon the view, then we can mutualize the events' listening, buffering, processing, additional
fetching, and computing required to update the view. This work can be done once and the result
delivered to each subscriber.

A Solution

A Simple Example to Illustrate the Solution

In the following lines, we'll use a very simplified example in order to present the solution: a
GraphQL subscription that produces sequential integers. Then we'll apply the solution to a more
complex real-life example.

The solution to the simple example is implemented in this
repository.

Using RxJS Multicasting Capabilities

By default observables are lazy and cold, meaning their side-effect are run only when a subscriber
subscribes to the observable. Furthermore, for every subscriber, the observable pipeline is
executed.

In the following example, the http query will be executed twice: query$ is a cold observable, no
side-effect happens until a subscriber subscribes to it. When a second subscriber subscribes, the
observable side-effect is re-executed.

const query$ = httpClient.get('/path/to/some/resource')
query$.subscribe(console.log)
query$.subscribe(console.log)
Enter fullscreen mode Exit fullscreen mode

This is in opposition to Promises which are eager and hot: in this example the http query is
executed only once.

const queryPromise = fetch('/path/to/some/resource')
queryPromise.then(console.log)
queryPromise.then(console.log)
Enter fullscreen mode Exit fullscreen mode

However, RxJS has multicasting capabilities to turn cold observables into hot observables. Using
RxJS's multicasting operators, we can create observable pipelines which are shared: their
side-effect is triggered when the first subscriber subscribes to it and when more than one
subscriber subscribes, the pipeline is executed only once and the emitted value is passed to all
subscribers.

Let's see an example:

import { interval } from 'rxjs'

const obs$ = interval(1000) // observable emitting 0, 1, 2, ... every second
obs$.subscribe(console.log) // prints 0, 1, 2, ...

setTimeout(() => obs$.subscribe(console.log), 3000) // after 3 seconds, prints 0, 1, 2, ...
Enter fullscreen mode Exit fullscreen mode

However, using the share operator, the observable pipeline
is executed once and the second subscriber (even subscribing later), gets the same (shared) result
as the first subscriber.

const obs$ = interval(1000).pipe(share()) // observable emitting 0, 1, 2, ... every second
obs$.subscribe(console.log) // prints 0, 1, 2, ...

setTimeout(() => obs$.subscribe(console.log), 3000) // after 3 seconds, prints 3, 4, 5 ...
Enter fullscreen mode Exit fullscreen mode

Sharing the Observables

In the simple example above, we have multiple subscribers subscribing to a shared observable, in the
same context of execution. However, in GraphQL subscriptions, the 2nd (or nth) subscriber will have
its own context of execution. If we want to reuse a shared observable, we have to store it.

Let's create a module to manage the subscriptions' shared observables:

// util function to get or create a publisher
// takes a "cache" of publisher and a factory function and returns a higher-order function to get or create from this cache
export const getOrCreatePublisher = (publishersMap, makePublisherFn) => entityId => {
  if (publishersMap.has(entityId)) {
    return publishersMap.get(entityId)!
  }

  const newPublisher = makePublisherFn(entityId)
  publishersMap.set(entityId, newPublisher)
  return newPublisher
}
Enter fullscreen mode Exit fullscreen mode
// assuming that the entityId is a number, and the shared observable emits numbers
const publishersMap = new Map<number, Observable<number>>()

const makePublisher = (entityId: number): Observable<number> => {
  return interval(1000).pipe(share())
}

export const getPublisher = getOrCreatePublisher(publishersMap, makePublisher)
Enter fullscreen mode Exit fullscreen mode

Now from our GraphQL's subscription resolver, we can use it like the following:

import { createSchema } from 'graphql-yoga'
import { eachValueFrom } from 'rxjs-for-await'

export const schema = createSchema({
  typeDefs: /* GraphQL */ `
    type Subscription {
      giveMeInts: Int!
    }
  `,
  resolvers: {
    Subscription: {
      giveMeInts: {
        subscribe(root, args, context) {
          return eachValueFrom(getPublisher(args.entityId))
        },
        resolve(root, args, context) {
          return root
        }
      }
    }
  }
})
Enter fullscreen mode Exit fullscreen mode

With this code, when a subscriber subscribes, it will create the shared observable (publisher),
store it in a cache so that it can be reused by subsequent subscribers, and subscribe to it. When a
second subscriber subscribes, it will get the existing publisher from the cache and subscribe to it,
and it will get the same value as the first subscriber gets. When all subscribers have unsubscribed,
the source observable is unsubscribed. If later, a new subscriber subscribes, the source observable
will be resubscribed and its pipeline reexecuted (in our case, it will start emitting from 0 again).


The graphql-js engine expects the subscribe handler to return an AsyncIterable, so we use the
rxjs-for-await package from Ben Lesh which contains
different functions from transforming an Observable into an AsyncIterable (there are different
functions to deal with backpressure and lossiness/losslessness).

So far, so good. But we have a memory leak: after all subscribers have unsubscribed, we keep the
publishers in the cache. We can clean those up using the
finalize operator: this operator allows running a
callback on observable termination (either the observable is complete or has no more subscribers).

Let's modify our publisher factory function:

import { interval } from 'rxjs'
import { finalize, share } from 'rxjs/operators'

const makePublisher = (entityId: number): Observable<number> => {
  return interval(1000).pipe(
    // clean up publisher cache when no more subscribers to this publisher
    finalize(() => publishersMap.delete(entityId)),
    share()
  )
}
Enter fullscreen mode Exit fullscreen mode

Delay the Shared Observable Cleanup

Because the view might be expensive to fetch and update, we only do so when users are subscribing to
it. In the previous example, as soon as all users have unsubscribed, the shared observable is
unsubscribed and deleted from the cache. This might not be the desired behavior, e.g.:

  • we can have one user subscribing to the view, this user refreshes his browser's page, effectively ending the subscription, and triggering a cache cleanup, but then the view is resubscribed almost immediately, triggering a new fresh fetch
  • we can have one user stopping the subscription (e.g. navigating away from this view to another part of our app), but another user subscribing soon (a few milliseconds, seconds or minutes) after

To optimize for those use cases, we can delay the unsubscription of the shared observable when no
more subscribers are subscribing to it, using the resetOnRefCountZero config of the share
operator. This is available in RxJS v7.

const makePublisher = (entityId: number): Observable<number> => {
  return interval(1000).pipe(
    // clean up publisher cache when no more subscribers to this publisher
    finalize(() => publishersMap.delete(entityId)),
    share({
      // when no more subscribers, wait 10 seconds to unsubscribe from the source and reset the underlying subject
      resetOnRefCountZero: () => timer(10_000)
    })
  )
}
Enter fullscreen mode Exit fullscreen mode

In the example above, after all subscribers have unsubscribed (refcount is 0) the connector (the
subject that subscribes to the source observable) now waits for 10 seconds (10000 ms) before it
resets the shared observable. During this period of time, the observable pipeline continues to
execute. If a new subscriber subscribes before the timer has emitted, the refcount is again
greater than 0 and the connector will keep connecting to the source observable.


We use timer in this example but we can use any observable (e.g. receiving a specific event from our message queue) that emits at some point. Be careful though to make sure that the observable you use will actually emit otherwise you'll have a memory leak. To remedy this issue, you might want to add safety, e.g. using raceWith with timer to make sure you don't wait indefinitely:

  share({
    // when no more subscribers, wait for 'my_event' up to 1 minute then unsubscribe from the source and reset the underlying subject
    resetOnRefCountZero: () => pubsub.events('my_event').pipe(
      raceWith(timer(60_000)),
    ),
  }),
Enter fullscreen mode Exit fullscreen mode

Reuse Shared Observable Sources across Multiple Subscriptions

With this solution, we can even go further: if a subscription depends on the processing of common
events or on the source of another subscription, we can reuse the shared observable created for
those.

Let's make a new subscription that returns a sentence with the current number emitted by the
giveMeInts subscription's source:

export const schema = createSchema({
  typeDefs: /* GraphQL */ `
    type Subscription {
      giveMeInts: Int!
      giveMeStringfiedInts: String!
    }
  `,
  resolvers: {
    Subscription: {
      /* giveMeInts subscription's resolver unchanged */
      giveMeStringfiedInts: {
        subscribe(root, args, context) {
          return eachValueFrom(getStringifiedPublisher(args.entityId))
        },
        resolve(root, args, context) {
          return root
        }
      }
    }
  }
})
Enter fullscreen mode Exit fullscreen mode

And the new publisher reusing the other subscription's publisher:

import { map } from 'rxjs/operators'

// create a new cache for those publishers
const stringifiedPublishersMap = new Map<number, Observable<string>>()

export const getStringifiedPublisher = getOrCreatePublisher(
  stringifiedPublishersMap,
  makeStringifiedPublisher
)

function makeStringifiedPublisher(entityId: number): Observable<string> {
  // this new publisher gets or creates the source publisher from the other subscription
  return getPublisher(id).pipe(
    map(val => `The current number is ${val}`),
    // clean up publisher cache when no more subscribers to this publisher
    finalize(() => stringifiedPublishersMap.delete(entityId)),
    share({
      // when no more subscribers, wait 10 seconds to unsubscribe from the source and reset the underlying subject
      resetOnRefCountZero: () => timer(10_000)
    })
  )
}
Enter fullscreen mode Exit fullscreen mode

Now, whether a subscription to giveMeStringfiedInts or giveMeInts starts first, both will be
shared and in sync. This further mutualizes the processing of expensive computations. In the simple
example showcased above, the advantage might be small, but in a real-world scenario, it could make a
big difference: imagine one subscription sending the full structure of a expensive-to-compute view
while a second subscription sends only a diff of the current structure with the previous one, the
second subscription's source depends on the source of the first. Mutualizing the view's computations
makes a difference regardless of which subscription has been triggered first.

Run Server-Side Logic When a User's Subscription Is Closed

In GraphQL we implement subscriptions by writing two functions:

  • a subscribe function which is run one time only and must return an AsyncIterable
  • a resolve function which is run every time a value is emitted by the AsyncIterable returned by the subscribe function and transforms this value into the Subscription output type.

In order to execute logic, we have 2 places: in subscribe when the subscription starts and in
resolve on each emitted value. What if we want to execute logic when the subscription ends
(whether it was completed or was stopped by the client)?

The answer is: it is not contemplated in the spec. But it doesn't mean we can't do it. In fact, it
is even very simple! If we use the solution described above, we already have all the pieces in
place. We only need to add one line: in the subscribe function where we set up the AsyncIterable
(by transforming the Observable into an AsyncIterable), we only have to use RxJS's finalize
operator.

import { createSchema } from 'graphql-yoga'
import { eachValueFrom } from 'rxjs-for-await'
import { finalize } from 'rxjs/operators'

export const schema = createSchema({
  typeDefs: /* GraphQL */ `
    type Subscription {
      giveMeInts: Int!
    }
  `,
  resolvers: {
    Subscription: {
      giveMeInts: {
        subscribe(root, args, context) {
          return eachValueFrom(
            getPublisher(args.entityId).pipe(
              finalize(() => {
                // run logic here when subscription ends
                console.log(`Subscription giveMeInts has ended for user ${context.userId}`)
              })
            )
          )
        },
        resolve(root, args, context) {
          return root
        }
      }
    }
  }
})
Enter fullscreen mode Exit fullscreen mode

In the previous examples, we used the finalize operator on the shared observable pipeline in order
to clean up after all subscriptions to a specific entity had ended. But we can also use it per
subscription by piping it onto the shared observable: this way it is only run for when this specific
subscription ends. And of course, like we just did, you can combine both!

Apply to a Real-Life Example

Let's consider a view such as a complex "playlist": this playlist is a list of items organized in
blocks, the items themselves are a complex aggregate of several multimedia (video, audio, graphics)
objects, those multimedia objects themselves might be aggregate (bundles) or have of some children
objects (e.g. sub-clips, graphics on a video, subtitles). In effect, this complex playlist is
organized in the form of a tree of objects.

The system emits different events:

  • some are related directly to the playlist itself (e.g. new item in the playlist, item removed from the playlist, item changed position inside playlist)
  • others are related to the parent multimedia objects (object deleted, object's metadata changed, object has new children objects)
  • others are related to the children objects of the top level objects (object deleted, object's metadata changed)

This playlist's data is expensive to fetch, can be large, and is cached only when users subscribe to
it: because it is expensive to compute updates, we want to ensure we don't process events for all
playlists in the system, only the ones some user is currently interested in.

Let's reuse the techniques explained above.

Buffer Events

First, we'll buffer the events, dedupe them and share the observable because those events will be
checked against every playlist subscribed to, so we can share the buffering and deduplication:

// utils
export const isNotEmpty = <T>({ length }: ArrayLike<T>) => length > 0
Enter fullscreen mode Exit fullscreen mode
import { from } from 'rxjs'
import { buffer, filter, share } from 'rxjs/operators'

const playlistEventNames = ['playlist_event_1', 'playlist_event_2', 'playlist_event_n'] as const
const objectsEventNames = ['objects_event_1', 'objects_event_2', 'objects_event_n'] as const

function bufferEvents(eventNames, bufferTime = 500) {
  return from(pubsub.events(eventNames)).pipe(buffer(bufferTime), filter(isNotEmpty), share())
}

const allPlaylistEvents$ = bufferEvents(playlistEventNames)
const allObjectsEvents$ = bufferEvents(objectsEventNames)
Enter fullscreen mode Exit fullscreen mode

Create Shared Publishers for the Playlists' Structure Changes

In this example, we'll assume that the datasource is the source of truth, we'll listen to events to
detect potential changes in the playlist and we'll force refetch (invalidate cache and fetch latest
version) from the datasource (assuming calls are deduped using a Dataloader backed by a cache).

import { merge } from 'rxjs'
import { bufferTime, filter, finalize, map, share, switchMap } from 'rxjs/operators'

const playlistStructureMap = new Map<number, Observable<PlaylistStructure>>()
const playlistStructureDiffMap = new Map<number, Observable<PlaylistStructureDiff>>()

export const getPlaylistStructurePublisher = getOrCreatePublisher(
  playlistStructureMap,
  makePlaylistStructurePublisher
)

export const getPlaylistStructureDiffPublisher = getOrCreatePublisher(
  playlistStructureDiffMap,
  makePlaylistStructureDiffPublisher
)

function makePlaylistStructurePublisher(playlistId: number): Observable<PlaylistStructure> {
  const playlistEvents$ = allPlaylistEvents$.pipe(
    map(events => events.filter(ev => ev.playlistId === playlistId)),
    filter(isNotEmpty)
  )

  const playlistObjectsEvents$ = allObjectsEvents.pipe(
    // this function filter the events that can have an impact on the structure of the playlist
    // e.g. "object has a new child object"
    // and discard other events that have no impact on the structure
    // e.g. "object name has changed"
    map(events => filterObjectEventsThatAffectPlaylistStructure(events)),
    // only keeps events where the objectId refers to an object that belongs to this playlist's structure
    map(events => filterObjectsThatBelongToPlaylist(events, playlistId)),
    filter(isNotEmpty)
  )

  return merge(playlistEvents$, playlistObjectsEvents$).pipe(
    // buffer because both source observables might emit shortly one after another
    bufferTime(500),
    filter(isNotEmpty),
    // fetchPlaylistStructure is a function backed by a dataloader with cache
    // passing `true` as 2nd param to force fetch from datasource instead of cache
    switchMap(_ => fetchPlaylistStructure(playlistId, true)),
    // remove publisher from cache after it's been unsubscribed
    finalize(() => {
      console.debug(
        `Shared playlistStructurePublisher[playlist=${playlistId}] has finalized, removing it from cache`
      )
      playlistStructureMap.delete(playlistId)
    }),
    share({
      // keep subscribing for 1 minute after last subscriber unsubscribes
      resetOnRefCountZero: () => timer(60_000)
    })
  )
}

function makePlaylistStructureDiffPublisher(playlistId: number): Observable<PlaylistStructureDiff> {
  // reuse the playlist shared observable
  return getPlaylistStructurePublisher(playlistId).pipe(
    startWith([]), // pairwise needs 2 values to compare, initialize observable with a default value
    pairwise(), // groups emission into a tuple [previousValue, currentValue]
    map(([previousPlaylist, currentPlaylist]) => makeDiff(previousPlaylist, currentPlaylist)),
    filter(isNotEmpty),
    // remove publisher from cache after it's been unsubscribed
    finalize(() => {
      console.debug(
        `Shared playlistStructureDiffPublisher[playlist=${playlistId}] has finalized, removing it from cache`
      )
      playlistStructureDiffMap.delete(playlistId)
    }),
    share({
      // keep subscribing for 1 minute after last subscriber unsubscribes
      resetOnRefCountZero: () => timer(60_000)
    })
  )
}
Enter fullscreen mode Exit fullscreen mode

Writing the Subscription's Resolver

Now that we have all the subscription's required logic encapsulated and data processing shared,
let's use it from the subscription's resolver:

import { createSchema } from 'graphql-yoga'
import { eachValueFrom } from 'rxjs-for-await'

export const schema = createSchema({
  typeDefs: /* GraphQL */ `
    type Subscription {
      playlistStructureUpdates(playlistId: Int): PlaylistStructure!
      playlistStructureDiffs(playlistId: Int): PlaylistStructureDiff!
    }
  `,
  resolvers: {
    Subscription: {
      playlistStructureUpdates: {
        subscribe(root, args, context) {
          return eachValueFrom(getPlaylistStructurePublisher(args.playlistId))
        },
        resolve(root, args, context) {
          return root
        }
      },
      playlistStructureDiffs: {
        subscribe(root, args, context) {
          return eachValueFrom(getPlaylistStructureDiffPublisher(args.playlistId))
        },
        resolve(root, args, context) {
          return root
        }
      }
    }
  }
})
Enter fullscreen mode Exit fullscreen mode

Conclusion

In this article, we have seen how to use RxJS (or the reactive programming library of your choice)
in order to mutualize the generation of expensive-to-produce values for a GraphQL subscription but
also to trigger this processing on-demand whenever a consumer is interested in subscribing to a
specific entity updates and stop it (with or without delay) when there are no more subscribers. Note
that this technique is not exclusive to GraphQL subscriptions and can be used for any kind of
subscription.

Top comments (0)