DEV Community

Cover image for How to use RxJS in Qwik
Mike Pearson
Mike Pearson

Posted on

How to use RxJS in Qwik

YouTube video for this article

RxJS isn't supported in Qwik, but if we want to code reactively in Qwik today, we need RxJS.

So how can we use RxJS on top of Qwik in a way that follows Qwik's philosophy of optimizing every bit of code to be downloaded to the client as late as possible?

RxJS's subscription model fits well with Qwik's philosophy of fine-grained code splitting: RxJS streams do nothing until something subscribes, so why perform work—or load the code to do that work—if nobody is listening?

But since we can't serialize RxJS directly, when the app renders on the server it should pass subscription information all the way up the RxJS chain, storing it in Qwik stores. Then when it loads on the client, we can check if an event has subscribers before loading any of the RxJS to process it.

We can also defer loading downstream RxJS until new values get there. So if we have a typeahead with a BehaviorSubject, a debounceTime and a switchMap, we won't need to load the code for the BehaviorSubject or debounceTime until the user types, or for the switchMap until a value gets past the filter. This is how Qwik would behave if we implemented it without RxJS, only using Qwik stores and useWatch$s.

This is the ideal behavior.

That'd be great

Unfortunately, after descending into a fairly deep rabbit hole, I still had no implementation that could work for all situations. This is an open challenge still. I will include my notes at the end of this article for anyone who wants to follow after me. This definitely is worth an NPM package once solved, and right now I want to focus on StateAdapt, so I won't be investing anymore time on this for now.

However, there is still a way to use RxJS in Qwik. And it's pretty good I think.

The solution that works now

Server sad client happy

Since we don't know how to serialize RxJS yet, we need to set up RxJS from scratch on the client. We can't break up the streams and load each segment lazily, but we can at least wait until RxJS somewhere on the stream is required before loading the whole stream. That's still better than what other frameworks could do without a lot of work or inconvenient syntax, so we shouldn't be too sad.

Sometimes we'll need a useClientEffect$ to get some immediate RxJS behavior, but the ideal API should make it easy to switch between that and the lazy useWatch$ hook, and we should prefer useWatch$ where possible.

Whenever we define an observable, we need to define the observables it depends on. We also need all the observables that depend on it, so we can find out if there is a subscription at the bottom. This way we'll have the expected/usual, lazy RxJS behavior of no unnecessary work being done until something subscribes. This means once one observable is needed, the entire reactive graph of observables that contains that observable needs to be loaded.

The way to trigger code to load in Qwik is to set state inside a Qwik store and use a hook to react to that state change. So every observable needs a corresponding store and useWatch$ that reacts to the store by loading and running the code that defines the observable. When one observable is loaded, we need a chain reaction that causes all observable code in the graph to load, and then for the observables to be defined from top to bottom.

So, to determine the exact architecture of this system, let's look at the different situations that might first trigger some RxJS behavior.

Example 1: User input

Imagine a typeahead that uses this RxJS:

const search$ = new BehaviorSubject('');

const results$ = search$.pipe(
  debounceTime(500),
  filter((search) => !!search.length),
  distinctUntilChanged(),
  switchMap((search) => fetchItems(search)),
);
Enter fullscreen mode Exit fullscreen mode

Qwik stores can hold state just like a BehaviorSubject, so we don't need to define the BehaviorSubject until the user types.

When the user types, we can set a Qwik store's value property and track it inside a useWatch$. Inside the useWatch$ we can define the BehaviorSubject.

Immediately after the BehaviorSubject is defined, we need to define the results$ observable. We need a generic hook that can react to any parent observable being defined. So we need to assign the parent observable to a Qwik store property that can be tracked. But how can we do this if observables can't be serialized? Well, it turns out that Qwik stores can contain unserializable values if you pass them through Qwik's noSerialize function first.

So, when the user types, let's have a property that we can define like

store.rx = noSerialize(new BehaviorSubject(initialValue));
Enter fullscreen mode Exit fullscreen mode

Now we can have all the downstream RxJS load because it's tracking the rx properties of its dependency observable stores.

Here's what this looks like:

RxJS in Qwik Demo 1

Each box represents a Qwik store. When it turns blue, its rx property has been defined. I added a delay of 2000 ms to this so you can see how it propagates down the chain of RxJS Qwik stores.

Here's the syntax for making this happen:

export default component$(() => {
  console.log("rerendering");
  const searchA = useSubject("searchA");

  const results = useStream$(
    () =>
      rx(searchA).pipe(
        debounceTime(500),
        filter((search) => !!search.length),
        distinctUntilChanged(),
        switchMap(search => fetchItems(search))
      ),
    [[] as Result[], [searchA]]
  );

  const resultsOut = useSubscribe(results);

  return (
    <>
      {* ... *}
    </>
  );
});
Enter fullscreen mode Exit fullscreen mode

There's a lot to explain here, so let's go through it and the source code for making it happen.

useSubject

useSubject is our custom hook that defines the BehaviorSubject store. Before we look at that source code, here are some simple utilities it uses:

RxStore is the shape of the Qwik stores we're creating for RxJS:

// utils/rx-store.type.ts
import { NoSerialize } from "@builder.io/qwik";
import { BehaviorSubject, Observable } from "rxjs";

export interface RxStore<T> {
  value: T;
  rx: NoSerialize<BehaviorSubject<T> | Observable<T>>;
  activated: boolean;
}
Enter fullscreen mode Exit fullscreen mode

propagationDelay is a constant I'm using in a few files, so I created a dedicated file for it, but you can ignore it and remove the setTimeout around it whenever you see it.

createRx is a simple utility function I made for ensuring that a BehaviorSubject is defined on the rx property of an RxStore. It might be a terrible name. But here it is:

import { noSerialize } from "@builder.io/qwik";
import { BehaviorSubject } from "rxjs";
import { RxStore } from "./rx-store.type";

export function createRx<T>(store: RxStore<T>, value: T) {
  return noSerialize(store.rx || new BehaviorSubject<T>(value));
}
Enter fullscreen mode Exit fullscreen mode

Now we're ready for useSubject:

// utils/use-subject.function.ts
import { useStore, useWatch$ } from "@builder.io/qwik";
import { BehaviorSubject } from "rxjs";
import { createRx } from "./create-rx.function";
import { propagationDelay } from "./propagation-delay.const";
import { RxStore } from "./rx-store.type";

export function useSubject<T>(initialState: T): RxStore<T> {
  const store: RxStore<T> = useStore({
    rx: undefined,
    value: initialState,
    activated: false,
  });

  useWatch$(({ track }) => {
    const value = track(store, "value");
    if (value !== initialState) {
      store.activated = true;

      if (store.rx) {
        (store.rx as BehaviorSubject<T>).next(value);
      } else {
        store.rx = createRx(store, value);
      }
    }
  });

  useWatch$(({ track }) => {
    const activated = track(store, "activated"); // If dependent needs this, it will set activated = true
    if (activated) {
      setTimeout(() => {
        store.rx = createRx(store, store.value);
      }, propagationDelay);
    }
  });

  return store;
}
Enter fullscreen mode Exit fullscreen mode

Child stores will react to rx being defined, and parent stores will react to activated being set to true. useStream$ will explain this more.

useStream$

Before we look at useStream$, we need to consider another situation: Combined observables.

Let's say results$ switch maps to another observable defined with a Qwik store. In order to define results$, we need to trigger the inner observable's code to be fetched. My first thought was to create a subscribed property so observables below could set it and trigger the code to load, but that would be confused with actual RxJS subscriptions, and they're not the same thing—the inner observable needs to be defined before it gets an actual subscription. So, let's make an activated property, and have the same useWatch$ that defines the observable track it.

This is what that behavior looks like:

RxJS in Qwik Demo 1

The red outline is when the store has been activated. Again, I've added a 2000 delay so you can see how all of this works. Normally there will be no delay.

Now we're ready for the code that makes this work. useStream$ also uses a lot of utilities, so let's go through those.

qrlToRx is a function that converts Qwik's QRL promises into observables. This is useful whenever you need to pass unserializable code into an RxJS stream through a function.

// utils/qrl-to-rx.function.ts
import { QRL } from "@builder.io/qwik";
import { from, Observable, switchAll } from "rxjs";

export function qrlToRx<T>(fn: QRL<() => Observable<T>>) {
  return () => from(fn()).pipe(switchAll());
}
Enter fullscreen mode Exit fullscreen mode

useRxStore is a function that defines a Qwik store for RxJS, but without the BehaviorSubject logic:

// utils/use-rx-store.function.ts
import { useStore } from "@builder.io/qwik";
import { RxStore } from "./rx-store.type";

export function useRxStore<T>(initialState: T): RxStore<T> {
  const store = useStore({
    rx: undefined,
    value: initialState,
    activated: false,
  });

  return store;
}
Enter fullscreen mode Exit fullscreen mode

Alright, now we're ready for the big useStream$ function. I added comments throughout to explain what it does, since it's so big.

// utils/use-stream.function.ts
import {
  implicit$FirstArg,
  noSerialize,
  QRL,
  useWatch$,
} from "@builder.io/qwik";
import { Observable, tap } from "rxjs";
import { propagationDelay } from "./propagation-delay.const";
import { qrlToRx } from "./qrl-to-rx.function";
import { RxStore } from "./rx-store.type";
import { useRxStore } from "./use-rx-store.function";

export function useStreamQrl<T>(
  fn: QRL<() => Observable<T>>,
  [initialState, deps]: [T, RxStore<any>[]]
): RxStore<T> {
  const store = useRxStore(initialState);

  useWatch$(({ track }) => {
    // Tracking store.activated and all dep rx's
    // Will not run until a child activates store or a parent rx gets defined
    // If activated or parent defined, activate all parents
    // If all parents defined, define this

    const activated = track(store, "activated");
    const [someDefined, allDefined, allActivated] = deps.reduce(
      (acc, source) => {
        const rx = track(source, "rx");
        acc[0] = acc[0] || !!rx;
        acc[1] = acc[1] && !!rx;
        acc[2] = acc[2] && source.activated;
        return acc;
      },
      [false, true, true]
    );

    if (someDefined) {
      setTimeout(() => {
        store.activated = true;
      }, propagationDelay);
    }

    if (activated && !allActivated) {
      // Activated from parent => someDefined becomes true
      // Activated from child => activated becomes true
      setTimeout(() => {
        deps.forEach(source => {
          source.activated = true;
        });
      }, propagationDelay);
    }

    if (allDefined) {
      setTimeout(() => {
        store.activated = true;
        store.rx = noSerialize(
          qrlToRx(fn)().pipe(tap(value => (store.value = value)))
        );
      }, propagationDelay);
    }
  });

  return store;
}

export const useStream$ = implicit$FirstArg(useStreamQrl);
Enter fullscreen mode Exit fullscreen mode

That was complicated, so hopefully it makes sense! Comment if anything is unclear.

useSubscribe

Finally, we need to subscribe to the stream.

Again, this one has some utilities. It's not obvious yet, but there's a reason I split them out. But first let's look at the hook itself:

import { useStore, useWatch$ } from "@builder.io/qwik";
import { RxStore } from "./rx-store.type";
import { subscribe } from "./subscribe.function";

export function useSubscribe<T>(store: RxStore<T>) {
  const outputStore = useStore({ value: store.value });

  useWatch$(args => {
    subscribe(args, store, outputStore);
  });

  return outputStore;
}
Enter fullscreen mode Exit fullscreen mode

So we have an input RxStore (with an rx property) to subscribe to, and a new Qwik store that will contain the results.

Here's subscribe:

// utils/subscribe.function.ts
import { RxStore } from "./rx-store.type";
import { WatchArgs } from "./watch-args.type";

export function subscribe<T>(
  { track, cleanup }: WatchArgs,
  store: RxStore<T>,
  outputStore: { value: T }
) {
  const rx = track(store, "rx");
  if (rx) {
    const sub = rx.subscribe((val) => (outputStore.value = val));
    cleanup(() => sub.unsubscribe());
  }
}
Enter fullscreen mode Exit fullscreen mode

Example #2: Client subscription

What if we have a timer we want to have start as soon as the component is visible on the client? We can define a useSubscribeClient$ hook that does the same thing as the useSubscribe$ above, but also has a useClientEffect$ that sets the parent stream's store's activated property. Here's what that looks like:

RxJS in Qwik Demo 3

And here's the implementation. It's the same as useSubscribe, but we add a client effect to activate the parent observable immediately:

import { useClientEffect$ } from "@builder.io/qwik";
import { RxStore } from "./rx-store.type";
import { useSubscribe } from "./use-subscribe.function";

export function useSubscribeClient<T>(store: RxStore<T>) {
  useClientEffect$(() => {
    store.activated = true;
  });
  return useSubscribe(store);
}
Enter fullscreen mode Exit fullscreen mode

How do we define the timer though? We'll treat it as an independent observable with a subscription of its own, but by default we'll wait for it to be activated by observables below. We can call it useStreamToSubject$. Here's how I implemented it:

// utils/use-stream-to-subject.function.ts
import { implicit$FirstArg, QRL, useWatch$ } from "@builder.io/qwik";
import { Observable } from "rxjs";
import { qrlToRx } from "./qrl-to-rx.function";
import { RxStore } from "./rx-store.type";
import { streamToSubject } from "./stream-to-subject.function";
import { useSubject } from "./use-subject.function";

export function useStreamToSubjectQrl<T>(
  fn: QRL<() => Observable<T>>,
  initialState: T
): RxStore<T> {
  const store = useSubject(initialState);
  useWatch$((args) => streamToSubject(args, store, qrlToRx(fn)()));
  return store;
}
export const useStreamToSubject$ = implicit$FirstArg(useStreamToSubjectQrl);
Enter fullscreen mode Exit fullscreen mode

There's only one function in there we haven't gone over yet:

// utils/stream-to-subject.function.ts
import { BehaviorSubject, finalize, Observable } from "rxjs";
import { RxStore } from "./rx-store.type";
import { WatchArgs } from "./watch-args.type";

export function streamToSubject<T>(
  { cleanup, track }: WatchArgs,
  store: RxStore<T>,
  obs$: Observable<T>
) {
  track(store, "activated");
  const sub = obs$
    .pipe(finalize(() => (store.rx as BehaviorSubject<T>)?.complete?.()))
    .subscribe(n => (store.value = n));
  cleanup(() => sub.unsubscribe());
}
Enter fullscreen mode Exit fullscreen mode

As you can see, it waits to be activated from below, then it calls the function passed into it to define the observable. An example usage:

const intervalStore = useStreamToSubject$(() => interval(3 * 1000), 0);
Enter fullscreen mode Exit fullscreen mode

Example 3: Independent observable emits

What if we don't want to load the entire RxJS stream until the timer emits? As mentioned at the beginning of this article, I don't have a general solution to lazy-loading segments of the same observable graph independently, but for the case of an observable that can be treated independently and obviously has a subscription, we can define a hook that will subscribe to an observable and assign the emitted values to a Qwik store's value property, similar to the way we handled user input. Is this a good idea, from a code organization point of view? I have no idea. But here's how the behavior looks:

RxJS in Qwik Demo 4

And here's the implementation:

// utils/use-stream-to-subject-client.function.ts
import { implicit$FirstArg, QRL, useClientEffect$ } from "@builder.io/qwik";
import { Observable } from "rxjs";
import { qrlToRx } from "./qrl-to-rx.function";
import { RxStore } from "./rx-store.type";
import { streamToSubjectClient } from "./stream-to-subject-client.function";
import { useSubject } from "./use-subject.function";

export function useStreamToSubjectClientQrl<T>(
  fn: QRL<() => Observable<T>>,
  initialState: T
): RxStore<T> {
  const store = useSubject(initialState);
  useClientEffect$((args) => streamToSubjectClient(args, store, qrlToRx(fn)()));
  return store;
}
export const useStreamToSubjectClient$ = implicit$FirstArg(
  useStreamToSubjectClientQrl
);
Enter fullscreen mode Exit fullscreen mode

Very similar to useStreamToSubject, but it calls streamToSubjectClient instead, and does it inside a useClientEffect$ so it runs immediately. Here's that implementation:

// utils/stream-to-subject-client.function.ts
import { Observable } from "rxjs";
import { RxStore } from "./rx-store.type";
import { WatchArgs } from "./watch-args.type";

export function streamToSubjectClient<T>(
  { cleanup }: WatchArgs,
  store: RxStore<T>,
  obs$: Observable<T>
) {
  const sub = obs$.subscribe(n => (store.value = n));
  cleanup(() => sub.unsubscribe());
}
Enter fullscreen mode Exit fullscreen mode

It's probably not worth having in a separate file, but I made it early on and never changed it. It's fine how it is I guess.

Example 4: Qwik store as observable

Qwik stores are synchronously reactive, so there will be many features that don't need RxJS. However, if we do end up needing some asynchronous reactivity from RxJS, wouldn't it be nice if we could just add it and use a regular Qwik store as if it were an RxJS Qwik store?

We'd want to be able to treat any property as an observable, so we'll need to provide both the store and the property we want to "chain off of" with our RxJS code. So the code would look like this:

const normalQwikStore = useStore({ prop: 'val' });
const rxStoreFromQwik = useSubjectFromStore([normalQwikStore, 'prop']);
// or
const store = useStore({ value: 'val' });
const rxStore = useSubjectFromStore(store);
Enter fullscreen mode Exit fullscreen mode

Now rxStore is ready to be used inside usePipe$.

And here's that implementation:

// utils/use-subject-from-store.function.ts
import { useWatch$ } from "@builder.io/qwik";
import { RxStore } from "./rx-store.type";
import { useSubject } from "./use-subject.function";

export interface UseSubjectFromStore<T> {
  <Prop extends string, Store extends Record<Prop, T>>(
    input: { value: T } | [Store, Prop]
  ): RxStore<T>;
}

export function useSubjectFromStore<T>(
  input: Parameters<UseSubjectFromStore<T>>[0]
) {
  const isArray = Array.isArray(input);
  const initialValue = isArray ? input[0][input[1]] : input.value;
  const store = useSubject(initialValue);

  useWatch$(({ track }) => {
    const value = isArray ? track(input[0], input[1]) : track(input, "value");
    store.value = value;
  });

  return store;
}
Enter fullscreen mode Exit fullscreen mode

Example 5: Operator only

Server happy client happy

RxJS has 114 operators. While its primary benefit is enabling code to be structured reactively, those 114 operators can add a lot of convenience. So what if we want to mostly just use Qwik stores, but sometimes just use a single operator from RxJS now and then?

This approach basically ignores RxJS's laziness by assuming everything already has a subscription. For inputs it takes in an operator, then a Qwik store and property tuple; and it outputs another Qwik store, with the value property containing the emitted value. Like this:

const store = useStore({ n: 0 });
const nEven = usePipe(() => filter(n => n % 2), [store, 'n']);
Enter fullscreen mode Exit fullscreen mode

Again, we can assume a default property of value for the input store.

We are only taking in a single operator, but we could use with custom operators to give us more RxJS in a single store:

const results = usePipe$(
    () => stream =>
      stream.pipe(
        tap(console.log),
        map(n => n * 10),
        filter(n => n >= 100),
      ),
    normalQwikStore,
  );
Enter fullscreen mode Exit fullscreen mode

Now, here's the implementation:

// utils/use-pipe.function.ts
import { implicit$FirstArg, QRL, useStore, useWatch$ } from "@builder.io/qwik";
import { OperatorFunction } from "rxjs";
import {
  UseSubjectFromStore,
  useSubjectFromStore,
} from "./use-subject-from-store.function";

export function usePipeQrl<T, U>(
  operatorQrl: QRL<() => OperatorFunction<T, U>>,
  input: Parameters<UseSubjectFromStore<T>>[0]
) {
  const rxStore = useSubjectFromStore(input);
  const outputStore = useStore({ value: undefined as any }); // Final operator output

  useWatch$(({ track, cleanup }) => {
    const rx = track(rxStore, "rx");
    if (rx) {
      operatorQrl().then(operator => {
        const sub = operator(rx).subscribe(
          value => (outputStore.value = value)
        );
        cleanup(() => sub.unsubscribe());
      });
    }
  });

  return outputStore;
}

export const usePipe$ = implicit$FirstArg(usePipeQrl);
Enter fullscreen mode Exit fullscreen mode

This might be the most promising way to use RxJS in Qwik today, since it puts Qwik's reactivity at the center. It could be that the ideal approach to asynchronous reactivity in Qwik isn't pure RxJS, but primarily a Qwik hook model. With hooks, calling them implies interest in their values, so it's similar to subscribing; just not as granular. So if that becomes the new "subscription" model, then RxJS just becomes a utility library to allow us to code declaratively for async logic without having to implement all of that all on our own up front (see my previous article). And since it's Qwik-centric, over time, each usePipe$ call could be replaced with a more Qwik-native solution, maybe involving signals, to get more fine-grained rendering.

This might be awesome.

Follow me if you want to hear when I figure out more stuff with this.


And that's what I have for now.

Now I'll include the notes for the ideal solution that I never found.


Notes

The ideal solution

Note: Qwik seems to be rapidly evolving still. Will anything with this RxJS stuff need to be redesigned? How can RxJS work optimally with signals? Apparently unresolved promises will soon be serializable in Qwik. What does that look like? It might be worth waiting for the dust to settle a little more before investing too much time into anything yet. However, these notes might remain relevant.

To implement the ideal behavior, we need to treat each segment of the RxJS steam independently, with its own internal subscription, and output to a Qwik store so we can trigger a useWatch$ to load and run the next RxJS segment after the current segment has done its work.

We also need to be able to load the streams from the bottom up, so the initial subscription information can propagate correctly. For example, if an observable is defined using a switchMap, the inner observable may not immediately be subscribed to, so on the client we should ignore the events at the top of that stream until the switchMap runs and a subscription gets passed up.

So, the overall process is this: A useSubscribe hook initially runs on the server, generating a trail of subscription state up to the tops of the streams. The app is serialized, including these subscription trails. When the app is resumed on the client, only RxJS streams that are on the subscription trails will be loaded, and only once they're required to process new events. As values trickle down the RxJS pipes, RxJS code at each segment is loaded, and if the streams are combined, the subscriptions are propagated up to the tops of the adjacent streams as needed, in the same process that was initially done for the first stream.

I imagine a syntax like this:

const search = useSubject$('');
const debouncedSearch = useStream$(() => rx => rx(search).pipe(debounceTime(500)));
Enter fullscreen mode Exit fullscreen mode

rx allows us to avoid providing the Qwik stores as explicit dependencies, making this syntax more DRY; actually the only way we can provide Qwik stores as proper dependencies is implicitly inside the Rx stream anyway, because we don't want to eagerly subscribe to anything. Consider this example:


const mode = useSubject$(Mode.Simple); // enum
const simple = useObservable(simple$);
const complex = useObservable(complex$);

const results$ = useStream$(() => rx => rx(mode).pipe(
  switchMap(m => m === Mode.Condense ? rx(simple$): rx(complex$)),
));
Enter fullscreen mode Exit fullscreen mode

We would not want Qwik to set up a subscription to complex$ unless mode.value was set to Mode.Complex.

The only way to properly propagate subscriptions up to the right Qwik stores is by using RxJS's own subscription mechanism.

So, how do we know when RxJS actually subscribes and perform a side-effect? That would be with defer. Then we can use a finalize to clean up afterwards.

So, this can all run on the server initially, and set up the tracks correctly, then only awake on the client when what's being tracked changes. But what changes? When a parent observable is defined, we're still not interested in it yet—we only care when it emits a value. So we need to set a Qwik store property at the end of that, so it can be tracked. So for each RxJS stream, we need a store to go along with it, and a subscription like this

store.sub = stream$.subscribe(value => store.value = value)
Enter fullscreen mode Exit fullscreen mode

So, rx needs to track the parent store's value property.

But here's a tricky situation on top of all of this: What if the event that causes the RxJS to first load on the client ends up passing a value downstream that changes the subscriptions to other stores? How do we update the Qwik stores so they update these subscriptions and don't load unnecessary RxJS or result in bugs from unexpected RxJS work?

We need a global context that tracks every store and its subscriptions. That means each store needs a unique ID. That can be provided in context as well, so whenever we use an RxJS hook, we need to call useContext to get an id for the new Qwik store corresponding with the RxJS pipe. Then whenever RxJS subscriptions are updated, we'll update this global Qwik store with the appropriate subscription information. This can be tracked, so a store can detect if something suddenly subscribes to it. Then it can activate its RxJS and set its value property.

So rx might have an implementation that would return something like this:

defer(() => {
  // Get parent store from global context; increment subscriber count
  // Track parent store `value` property
  // Add store id to a dep list in this store

  // return BehaviorSubject representing parent 
}).pipe(finalize(() => {
  // Get parent store from global context; decrement subscriber count
  // Set store.value to undefined?
  // Remove store id from dep list in this store
  // 
}))
Enter fullscreen mode Exit fullscreen mode

I forgot to explain why I have this comment in there: // Add store id to a dep list in this store. Why does the current store also need a list of the stores it depends on? We need to call track inside our defer so that Qwik will call our useWatch$ on the client when the parent observable emits a value. This gives us the ability to load the RxJS and define it on the client. We need to set aside the store dependency IDs before subscribing to the new stream, because the initial values on the client might cause a change in dependencies, and we'll need to update the global context with that changed subscription information. But once we define the RxJS stream, it's not going to be called again; so when a parent store updates a value, it's not going to be tracked next time around. This could potentially be a problem. Maybe we can also track the dependencies and use that to track the global context... but does that mean we need to store the RxJS emitted values in the global context as well? And can we really do tracking this dynamically? And is this efficient enough?

At this point my thinking gets really muddled. This was very much a work in progress. I've gone between a few different architectures, but I don't feel that I'm up to this challenge at the moment. I'm going to wait for Qwik to mature more before investing more time into this. For now I'm going to focus on StateAdapt again.

My goal here was just to get my thoughts out there, and if they're useful to anyone who wants to try harder than I have, I hope they at least provide a starting point. I might be close or far from a good solution. But I don't even feel like I've considered all possible scenarios or challenges.

Other challenges

What about streams that complete? How do you update subscriptions when that happens?

How do you handle errors?

Other challenges?

Implementation

Every time a new subscription occurs, we need to load the entire upstream RxJS chain. To know which exact path needs to be loaded, we need to load and define the RxJS stream at each segment first, and follow the subscription to whatever upstream RxJS it goes to. Since the next level up isn't defined yet, we need a way to intercept the reference and subscription to the next segment's observable, and increase a subscriber count in that stream's Qwik store (which will trigger the same process for that segment) and return an observable of that Qwik store's value property, which will eventually contain the output of that segment's stream.

and at each segment add to a subscriber count in a Qwik store. Every time a finalize gets called, we'll decrement the subscriber count. When an app is serialized, it will

As long as this subscribe count isn't rendered in JSX, Qwik will not call the cleanup functions associated with them when the app is serialized


Propagating and storing subscription data is not a simple task. For each segment of lazy-loaded RxJS, we need a Qwik store that can react to changes in the Qwik stores containing its upstream observables. However, depending on how the RxJS is defined, not all the upstream Qwik stores should have a subscription passed on to them.

But we also don't want to just load the entire RxJS stream to pass the subscription up in the natural RxJS way either, because again, we'd have to load the entire stream to do that.


I may have mixed in some article notes with that as well, but that's okay. You know my mindset a bit more maybe.

Thanks for reading! Go check out StateAdapt and give it a star :)

Top comments (0)