DEV Community

Ildar Timerbaev
Ildar Timerbaev

Posted on

React query for data streaming

Introduction

Everyone is familiar with the popular framework for managing API requests — @tanstack/react-query. This framework is designed for handling browser HTTP API requests. However, that’s not the only way to use it. React query also supports any promises within queries and mutations, which assists us in setting up data streaming and react query together. Let’s get started!

Theory

Client setup

Since We want to handle data streaming then We assume to use a some kind of WebSocket with STOMP for it. Let’s imagine that We have very simple data flow.

const stompClient = new StompClient()
stompClient.subscribe((event) => {...})
stompClient.publish('mychannel, {...})
Enter fullscreen mode Exit fullscreen mode

To gain access from any part of our application, we should position this client within the React context, which will be provided in the root component. Currently, this context will only contain a client instance for accessing our queries and mutations.

const StompContext = createContext({
 client: undefined
})

function StompManager({ children }: PropsWithChildren) {
 const [client, setClient] = useState(new StompClient())

 return <StompContext.Provider value={{ client }}>{children}</StompContext.Provider>
}

function useStompManager() {
 const { client } = useContext(StompContext)
 return client
}
Enter fullscreen mode Exit fullscreen mode

Query

Let’s revisit how queries work. They consist of an identification key, a query function, and options. Since the identification key is self-explanatory, we’ll focus on the query function and options.

Firstly, each query should only be activated when our client is ready.

const client = useStompManager()
...
useQuery([QueryKey], () => {...}, { enabled: !!client })
Enter fullscreen mode Exit fullscreen mode

Secondly, each query requires a function to subscribe to the channel and wait for new data. There are several different subscription types:

  1. One event subscription — This is a pattern where we receive something from the channel and then complete the subscription (e.g., getting a user profile).
  2. Infinite event subscription — This is a pattern where we listen to the channel until it’s unsubscribed or cancelled (e.g., listening to chat messages). Each subscription should be stored in a subscriptions storage to avoid generating multiple subscriptions on each query call.

Since we may have infinite queries, we need to create a subscriptions storage in the StompContext. For this, I'll use the useMap hook from the react-use package. Let's establish a rule that the subscription identifier is the query key.

const [subscribers, { set, get }] = useMap<Record<string, boolean>>();
Enter fullscreen mode Exit fullscreen mode

Mutation

The implementation of mutations can vary depending on the protocol used. For instance, chat messages could return a sent message in a channel, indicating successful transmission. However, this is a unique case. Suppose the STOMP client has a publish method that returns a promise. In that case, this mutation doesn't have any specific logic.

function useStompMutation(mutationKey: string, destination: string) {
  // getting client
  // default useMutation hook with sending the message in mutation function
}
Enter fullscreen mode Exit fullscreen mode

Sending without confirmation

function useStompConfirmableMutation(mutationKey: string, channel: string, destination: string) {
  // getting client
  // default useMutation hook with next mutation function:
  // 0. Return the promise with resolve, reject parameters
  // 1. Subscribing to specific channel and resolving the promise there
  // 2. Sending a message based on mutation function parameters
  // 3. Handling error to reject the promise
}
Enter fullscreen mode Exit fullscreen mode

Sending with confirmation

Implementation

One event query

function listenOnce(client: StompClient, channel: string) {
 return new Promise((resolve, reject) => {
 const sub = client.subscribe(channel, event => {
   sub.unsubscribe()
   resolve(event)
  })
 })
}
Enter fullscreen mode Exit fullscreen mode
function useStompQuery<DATA, ERROR>(
 queryKey: QueryKey, 
 channel: string,
 options?: UseQueryOptions<DATA, ERROR>
) {
 const client = useStompClient()
 return useQuery(
  queryKey, 
  async () => listenOnce(),
  {
   ...options,
   enabled: !!client && (options?.enabled ?? true),
  }
}
Enter fullscreen mode Exit fullscreen mode

Infinite events query

function useStompQuery<DATA, ERROR>(
 queryKey: QueryKey, 
 channel: string,
 newDataResolver: (previousData: DATA[], newComingData: DATA) => void,
 options?: UseQueryOptions<DATA[], ERROR>,
) {
 const client = useStompClient()
 const { setSubscriber, getSubscriber } = useContext(StompContext)
 const queryClient = useQueryClient()

 // Hook from react-use package
 useMount(() => {
  if (getSubscriber(queryKey)) {
   return
  }

  const sub = client.subscribe(channel, event => {
   const previousData = queryClient.getQueryData<DATA[]>(queryKey)
   let nextData = newDataResolver(previousData, event)
   queryClient.setQueryData(queryKey, nextData)
  })
  setSubscriber(queryKey, sub)
  })

 return useQuery<DATA[]>(
  queryKey,
  () => queryClient.getQueryData<DATA[]>(queryKey),
  {
   ...options,
   enabled: !!client && (options?.enabled ?? true),
  }
}
Enter fullscreen mode Exit fullscreen mode

Let’s highlight an implementation details:

  • newDataResolver – special mutating function that allow us to control each new event coming(e.g. whenever new message coming then We could mark it with sent status);
  • () => queryClient.getQueryData<DATA[]>(queryKey) – as this query hasn’t one-time fetching function then We have to return the query itself data on each refetch.

Mutation without confirmation

function useStompMutation<T, DATA>(
 mutationKey: string, 
 destination: string,
 options?: UseMutationOptions<DATA>
) {
 const client = useStompClient()
 return useMutation<DATA>(
  mutationKey, 
  async (message: T) => {
   if (!client) {
    throw new Error('Client not found')
   }
   client.publish(destination, message)
  },
  options
 ) 
}
Enter fullscreen mode Exit fullscreen mode

Mutation with confirmation

function useStompConfirmableMutation<T, DATA>(
 mutationKey: string, 
 destination: string,
 channel: string,
 confirmation?: (event: DATA) => boolean,
 options?: UseMutationOptions<DATA>
) {
 const client = useStompClient()
 return useMutation<DATA>(
  mutationKey, 
  async (message: T) => new Promise(async (resolve, reject) => {
   if (!client) {
    throw new Error('Client not found')
   }

   client.subscribe(channel, event => {
    if (confirmation(event)) {
     resolve(event)
    }
   })

   try {
    await client.publish(destination, message)
   } catch (e) {
    reject(e)
   }
  }),
  options
 ) 
}
Enter fullscreen mode Exit fullscreen mode

The potential issue here is that if the event doesn’t return in the channel, it may cause an infinite promise. Implementing timeout rejection could resolve this, but I’ll leave that up to you 🙂

Conclusion

We’ve successfully implemented a working solution for data streaming in React Query. However, this solution may not be ideal for all use-cases, so you might need to adjust it depending on your data streaming service implementation.

Thank you for reading!

Source: Own article on Medium

Top comments (0)