This is my memo when I implemented a subscription with tRPC along with the official documents. I also referred to the code example provided in the document.
1. Add onUpdate subscription
// src/server/api/routers/tweet.ts
import { EventEmitter } from "events";
import { z } from "zod";
import { observable } from "@trpc/server/observable";
import { type Tweet } from "@prisma/client";
import {
createTRPCRouter,
publicProcedure,
protectedProcedure,
} from "~/server/api/trpc";
// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();
export const tweetRouter = createTRPCRouter({
...others
onUpdate: publicProcedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable<Tweet>((emit) => {
const onAdd = (data: Tweet) => {
// emit data to client
emit.next(data);
};
// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on("add", onAdd);
// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off("add", onAdd);
};
});
}),
});
2. Creating a WebSocket-server
// src/server/wsServer.ts
import { applyWSSHandler } from "@trpc/server/adapters/ws";
import ws from "ws";
import { appRouter } from "./api/root";
import { createTRPCContext } from "./api/trpc";
const wss = new ws.Server({
port: 3000,
});
const handler = applyWSSHandler({ wss, router: appRouter, createTRPCContext });
wss.on("connection", (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once("close", () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log("✅ WebSocket Server listening on ws://localhost:3001");
process.on("SIGTERM", () => {
console.log("SIGTERM");
handler.broadcastReconnectNotification();
wss.close();
});
3. Setting TRPCClient to use WebSockets
// src/utils/api.ts
import {
httpBatchLink,
loggerLink,
wsLink, // ADD
createWSClient, // ADD
} from "@trpc/client";
import { createTRPCNext } from "@trpc/next";
import { type inferRouterInputs, type inferRouterOutputs } from "@trpc/server";
import superjson from "superjson";
import { type AppRouter } from "~/server/api/root";
// ADD: create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3000`,
});
const getBaseUrl = () => {
if (typeof window !== "undefined") return "";
if (process.env.VERCEL_URL) return `https://${process.env.VERCEL_URL}`;
return `http://localhost:${process.env.PORT ?? 3000}`;
export const api = createTRPCNext<AppRouter>({
config() {
return {
transformer: superjson,
links: [
loggerLink({
enabled: (opts) =>
process.env.NODE_ENV === "development" ||
(opts.direction === "down" && opts.result instanceof Error),
}),
httpBatchLink({
url: `${getBaseUrl()}/api/trpc`,
}),
// ADD weLink here
wsLink({
client: wsClient,
}),
],
};
},
ssr: false,
});
4. Use subscription
import { useEffect, useState } from "react";
import Head from "next/head";
import { api } from "~/utils/api";
import TweetItem from "~/components/TweetItem";
import { type RouterOutputs } from "~/utils/api";
type Tweets = RouterOutputs["tweet"]["all"];
export default function Home() {
const { data } = api.tweet.all.useQuery<Tweets>();
// ADD
const [tweets, setTweets] = useState<Tweets>([]);
// ADD
api.tweet.onUpdate.useSubscription(undefined, {
onData(tweet) {
setTweets((prev) => [tweet, ...prev]);
},
});
useEffect(() => {
if (data) {
setTweets(data);
}
}, [data]);
return (
<>
<Head>
<title>My T3 Twitter Lite</title>
<meta
name="description"
content="Very lite Twitter-ish app generated by create-t3-app"
/>
<link rel="icon" href="/favicon.ico" />
</Head>
<div className="container mx-auto px-4 py-16">
<div className="mt-8 flex w-full flex-col items-center gap-4">
{tweets ? (
tweets.length === 0 ? (
<p className="text-2xl text-blue-700">No tweets found</p>
) : (
<>
{tweets.map((tweet) => (
<TweetItem tweet={tweet} key={tweet.id} />
))}
</>
)
) : (
<p className="text-2xl text-blue-700">Loading...</p>
)}
</div>
</div>
</>
);
}
Details about more basic implementation of tRPC is here.
Code is available here.
Top comments (0)