DEV Community

Cover image for tRPC 102: Subscription/Web Socket
Yuko
Yuko

Posted on

tRPC 102: Subscription/Web Socket

This is my memo when I implemented a subscription with tRPC along with the official documents. I also referred to the code example provided in the document.

1. Add onUpdate subscription

    // src/server/api/routers/tweet.ts
    import { EventEmitter } from "events";
    import { z } from "zod";
    import { observable } from "@trpc/server/observable";
    import { type Tweet } from "@prisma/client";
    import {
      createTRPCRouter,
      publicProcedure,
      protectedProcedure,
    } from "~/server/api/trpc";

    // create a global event emitter (could be replaced by redis, etc)
    const ee = new EventEmitter();

    export const tweetRouter = createTRPCRouter({
      ...others

      onUpdate: publicProcedure.subscription(() => {
        // return an `observable` with a callback which is triggered immediately
        return observable<Tweet>((emit) => {
          const onAdd = (data: Tweet) => {
            // emit data to client
            emit.next(data);
          };
          // trigger `onAdd()` when `add` is triggered in our event emitter
          ee.on("add", onAdd);

          // unsubscribe function when client disconnects or stops subscribing
          return () => {
            ee.off("add", onAdd);
          };
        });
      }),
    });
Enter fullscreen mode Exit fullscreen mode

2. Creating a WebSocket-server

    // src/server/wsServer.ts
    import { applyWSSHandler } from "@trpc/server/adapters/ws";
    import ws from "ws";
    import { appRouter } from "./api/root";
    import { createTRPCContext } from "./api/trpc";

    const wss = new ws.Server({
      port: 3000,
    });
    const handler = applyWSSHandler({ wss, router: appRouter, createTRPCContext });

    wss.on("connection", (ws) => {
      console.log(`➕➕ Connection (${wss.clients.size})`);
      ws.once("close", () => {
        console.log(`➖➖ Connection (${wss.clients.size})`);
      });
    });
    console.log("✅ WebSocket Server listening on ws://localhost:3001");

    process.on("SIGTERM", () => {
      console.log("SIGTERM");
      handler.broadcastReconnectNotification();
      wss.close();
    });
Enter fullscreen mode Exit fullscreen mode

3. Setting TRPCClient to use WebSockets

    // src/utils/api.ts
    import {
      httpBatchLink,
      loggerLink,
      wsLink, // ADD
      createWSClient, // ADD
    } from "@trpc/client";
    import { createTRPCNext } from "@trpc/next";
    import { type inferRouterInputs, type inferRouterOutputs } from "@trpc/server";
    import superjson from "superjson";
    import { type AppRouter } from "~/server/api/root";

    // ADD: create persistent WebSocket connection
    const wsClient = createWSClient({
      url: `ws://localhost:3000`,
    });

    const getBaseUrl = () => {
      if (typeof window !== "undefined") return ""; 
      if (process.env.VERCEL_URL) return `https://${process.env.VERCEL_URL}`; 
      return `http://localhost:${process.env.PORT ?? 3000}`; 

    export const api = createTRPCNext<AppRouter>({
      config() {
        return {
          transformer: superjson,
          links: [
            loggerLink({
              enabled: (opts) =>
                process.env.NODE_ENV === "development" ||
                (opts.direction === "down" && opts.result instanceof Error),
            }),
            httpBatchLink({
              url: `${getBaseUrl()}/api/trpc`,
            }),
            // ADD weLink here
            wsLink({
              client: wsClient,
            }),
          ],
        };
      },
      ssr: false,
    });
Enter fullscreen mode Exit fullscreen mode

4. Use subscription

    import { useEffect, useState } from "react";
    import Head from "next/head";
    import { api } from "~/utils/api";
    import TweetItem from "~/components/TweetItem";
    import { type RouterOutputs } from "~/utils/api";
    type Tweets = RouterOutputs["tweet"]["all"];

    export default function Home() {
      const { data } = api.tweet.all.useQuery<Tweets>();
      // ADD
      const [tweets, setTweets] = useState<Tweets>([]);

     // ADD
      api.tweet.onUpdate.useSubscription(undefined, {
        onData(tweet) {
          setTweets((prev) => [tweet, ...prev]);
        },
      });

      useEffect(() => {
        if (data) {
          setTweets(data);
        }
      }, [data]);

      return (
        <>
          <Head>
            <title>My T3 Twitter Lite</title>
            <meta
              name="description"
              content="Very lite Twitter-ish app generated by create-t3-app"
            />
            <link rel="icon" href="/favicon.ico" />
          </Head>
          <div className="container mx-auto px-4 py-16">
            <div className="mt-8 flex w-full flex-col items-center gap-4">
              {tweets ? (
                tweets.length === 0 ? (
                  <p className="text-2xl text-blue-700">No tweets found</p>
                ) : (
                  <>
                    {tweets.map((tweet) => (
                      <TweetItem tweet={tweet} key={tweet.id} />
                    ))}
                  </>
                )
              ) : (
                <p className="text-2xl text-blue-700">Loading...</p>
              )}
            </div>
          </div>
        </>
      );
    }
Enter fullscreen mode Exit fullscreen mode

Details about more basic implementation of tRPC is here.
Code is available here.

Top comments (0)