loading...
Cover image for Turn a Stream of Objects into an Object of Streams
RxJS

Turn a Stream of Objects into an Object of Streams

kosich profile image Kostia Palchyk Updated on ・2 min read

Hi, fellow streamer! Today I want to share a JS/TS package that allows you to access props of objects on Observables:

source$.subscribe(o => console.log(o?.a?.b?.c))
// turn ↑ into ↓
source$.a.b.c.subscribe(console.log)
Enter fullscreen mode Exit fullscreen mode

tl;dr: github.com/kosich/rxjs-proxify


A simple use case:

import { proxify } from "rxjs-proxify";
import { of } from "rxjs";

const source = of({ msg: 'Hello' }, { msg: 'World' });
const stream = proxify(source);
stream.msg.subscribe(console.log); // 'Hello', 'World'
Enter fullscreen mode Exit fullscreen mode
☝️ of will create an Observable out of it's arguments

The package has good TypeScript support, so all props are intelli-sensed by cats, dogs, and IDEs:

import { of } from 'rxjs';
import { proxify } from 'rxjs-proxify';

const source = of({ a:1, b:1 }, { a:2, b:2 });
const stream = proxify(source);
stream. // <- will suggest .a .b .pipe .subscribe etc
Enter fullscreen mode Exit fullscreen mode
👀 I can see your intentions

It's also possible to call methods on values, e.g.:

import { proxify } from "rxjs-proxify";
import { of } from "rxjs";

const source = of({ msg: () => 'Hello' }, { msg: () => 'World' });
const stream = proxify(source);
// calls msg() fn on each value of the stream
stream.msg().subscribe(console.log); // 'Hello', 'World'
Enter fullscreen mode Exit fullscreen mode
🤯 pure magic, I tell you

And you can apply RxJS operators at any depth:

import { proxify } from "rxjs-proxify";
import { of } from "rxjs";
import { scan } from "rxjs/operators";

const source = of({ msg: 'Hello' }, { msg: 'World' });
const stream = proxify(source);
stream.msg.pipe(scan((a, c)=> a + c)).subscribe(console.log); // 'HelloWorld'
Enter fullscreen mode Exit fullscreen mode

The package uses Proxies under the hood, recursively applying it to sub-properties and method results, so the chain can be indefinitely deep. And you can apply .subscribe or .pipe at any time!

🎹 Try it

You can install it via npm i rxjs-proxify

Or test it online: stackblitz.com/edit/rxjs-proxify-repl

📖 Repository

The source code and more examples are available on github repo:

github.com/kosich/rxjs-proxify

Outro

Thank you for reading this article! Stay reactive and have a nice day 🙂

If you enjoyed reading — please, indicate that with ❤️ 🦄 📘 buttons

Soon I'll post a more detailed review of the lib and how it works

Follow me here and on twitter for more RxJS, React, and JS posts:

🗣 Would love to hear your thoughts!

Posted on by:

kosich profile

Kostia Palchyk

@kosich

Hi! I write about code and nocode. JS, RxJS, React and other stuff. ❤️

RxJS

This is where we write about RxJS. It's meant to be a place for everyone who is interested in RxJS.

Discussion

pic
Editor guide
 

Does it play nicely with the <$> component? It would be a great combination. It’s a similar idea to Hookstate. And what about cats and dogs Subject ? and .next method?

 

Hi, Franciszek!

Does it play nicely with the <$> component? It would be a great combination.

Great question! Especially since I haven't tried it yet with <$> myself! 😄 (the idea came up for use in recksjs framework) Now I did! Here's an example:

function getStatus() {
  return ajax.getJSON<{ message: string }>('https://api.github.com/status');
}

function App(){
  const { message } = proxify(getStatus());
  return <$>{ message }</$>
}

^ online playground — I've added two precautions in comments, please be advised!

And what about Subject ? and .next method?

Hm, I haven't considered this before 🤔 The original idea was to provide a selector, independently from provider. And the .next method would be lost after first subproperty access, e.g. stream.a — since it's map-ped under the hood. Though having the initial Subject, one can still do:

const state = new Subject<{ a: number, b: number }>();
const { a, b } = proxify(state);
// consume a, b in some way, and then
state.next({ a: 1, b: 2 });

THEORETICAL PART

Yet there's something to be discovered w/ state management, like:

// !pseudocode!
const [in, out] = store({ a: { b: 1 });
in.a.b.subscribe(console.log); // log 1
out.a.b = 2; // next-s to the store, that leads to logging 2

Quite interesting concept, need to think & play around w/ this! 🤓

EOF THEORETICAL PART


Let me know what you think!

P.S: And thx for Hookstate, didn't know about it!

 

THEORETICAL UPDATE:

// create a state
const [read, write, reset] = createState({ a: { b: { c: 1 } } });

// listen to state changes
read.a.b.c
  .subscribe(c => console.log('C:', c)); // > C:1

// write to the state
write.a = { b: { c: 2 } }; // > C:2
write.a.b = { c: 3 };      // > C:3
write.a.b.c = 4;           // > C:4

// reset the state
reset({ a: { b: { c: 42 } } }); // > C:42

~ Well typed, though has bugs 🙂
Heres a playground: stackblitz.com/edit/rstate?file=in...

Wow, it looks like MobX now 🤯. I checked the code. What about changing the approach? Instead of chaining observables, we can chain the properties names and then just pluck them for subscribing. I made the demo. I also used Immer to deliver next state 😛

const state = store({ a: 1, b: { c: 2 } })

state.subscribe(console.log) // Logs: { a: 1, b: { c: 2 } } | { a: 1, b: { c: 7 } } | { a: 1, b: { c: 100 } }
state.b.c.subscribe(console.log) //Logs: 2 | 7 | 100
state.b = { c: 7 }
state.b.c = 100

Awesome! I especially like that you've united read & write: I wanted to do it too! (but reused rxjs-proxify cz of already implemented TypeScript)

And I agree, a single pluck is nicer & more performant, though we might need a custom pluck operator if we want to support a.b.c() — since pluck won't keep the call context: this will be lost, not sure if that's a big deal

I still have mixed feelings regarding whether pushing to state should be a.b.c = 1 or a.b.c.next(1) — latter is uglier, I admit, though it has two benefits:

  1. it allows to easily set root state a.next({ b: { c: 1 } })
  2. it gives some symmetry to the API: callback effect subscription on one side, and fn call to create effect on the other (a.b = 1 is not obviously effectful)

Regardless of implementation details, do you think such approach could be useful as a package? I think in Recks it can be handy, maybe in <$> too, not sure where else..

Will try to compile a unified solution early next week 👀

I don't get the first point. Which this will be lost? Help me understand 😅. I updated the demo with apply trap and everything seems ok to me. Besides your proxify does not keep the this of the root object if that's what you mean

When it comes to = vs .next: IMHO = with a company of .next or just .next. Keeping only = as you just said disallows setting the root state and makes the proxy less usable in higher-order scenarios like passing the chain as a prop to further set the next value.

Could it be handy as a package? Using RxJS as a standalone state manager is very rare these times. Subjects in comparison to their relatives from MobX seems poor. Although mixing MobX with RxJs feels a little cumbersome due to the very different subscription fashion. Maybe it would be better to create a store by nesting observables as MobX does?

Which this will be lost?

Sorry, now I'm confused 🙂 I see that updated demo has the "this" is lost example — I meant exactly that. With o.f() I'd expect this in f() to be o (as in object o, not Observable of o). Here's a test from proxify that might better explain it.

Yet, this is a really minor issue (if it is an issue in the first place!), easy to fix and not worth much attention.

proxify does not keep the this of the root object

Can you share an example of this? I might be missing something obvious here 🙁

Maybe it would be better to create a store by nesting observables as MobX does?

Will have to educate myself better on MobX to appreciate this (haven't worked with MobX yet, only tutorials 😅)
Give me a hint if you have some particular use case in mind 🤔

P.S: Thanks for fn?.() — I didn't know that optional chaining can be applied to function calls! That's great!

Apologize for my wicked accusations 😌. proxify does keep the this. I didn't test it before just got the wrong claims reading the source

Phew 😅 ! That's cool! Thanks for proofreading the sources! 🙏

Hey, @fkrasnowski , sorry for bothering you again 🙂
Want to give an update:

State:

👂 listen to distinct state value changes
📝 write to state (sub)values
💫 reset state
👀 sync read current state value
👓 TS support coming

/ I've dropped fn calls with whole this issue for now 😅 and used your cool approach with pluck! 👍 /

state use example

🔗 work in progress

Autorun:

Another THEORETICAL thing born in discussions (here and on twitter for the very same proxify 🙂)

A function that is re-evaluated when an Observable used in it changes
(I think MobX' autorun does a similar thing)

autorun a fn on observable change

🔗 work in progress #2

Mix:

The two might look cool together:

state w/ autotrack mix


Let me know what you think 🤔
Take care!

I've got mixed feelings about all this autorun thing.

  1. It looks very cool. And I love this pattern. I find it clear and convenient
  2. But, RxJS already includes the combineLatest operator for the same purpose, less cool, still clear, and convenient.
  3. But, MobX fans will fall in love with it!

Comparison between some aproaches:

RxJS:

// atom initializtion:
const lizard$ = new BehaviorSubject('🦎');
const wizard$ = of('🧙‍♂️');

// reaction / subscription:
combineLatest([lizard$, wizard$]).subscribe(([lizard, wizard]) =>
  console.log(`king gizzard & the ${lizard} ${wizard}`)
);

// update atom:
lizard$.next('🐓');

MobX:

// atom initializtion:
const lizard = observable({ emoji: '🦎' });
const wizard = observable({ emoji: '🧙‍♂️' });

// reaction / subscription:
autorun(() =>
  console.log(`king gizzard & the ${lizard.emoji} ${wizard.emoji}`)
);

// update atom:
lizard.emoji = '🐓';

Svelte:

// atom initializtion:
let lizard = '🦎' 
let wizard =  '🧙‍♂️' 

// reaction / subscription:
$: console.log(`king gizzard & the ${lizard} ${wizard}`)

// update atom:
lizard = '🐓';

It might look like MobX and Svelte strategy is better, due to their terseness. But RxJS is about operators! And its full force lays in them. Your solution:

// atom initialization:
const lizard$ = new BehaviorSubject('🦎');
const wizard$ = of('🧙‍♂️');

// reaction / subscription:
autorun($ => console.log(`king gizzard & the ${$(lizard$)} ${$(wizard$)}`))

// OR with pipe and operators!
run($ =>
  `king gizzard & the ${$(lizard$)} ${$(wizard$)}`).pipe(/* operators */).subscribe(console.log)


// update atom:
lizard$.next('🐓');

Best of both worlds??

Glad you add distinctUntilChanged. And I think run could be better named computed or derived or autopiped 😨😝

Totally agree with all three points!
And I love your examples — this gives some perspective! Thanks 👍

I've started this only because it was a fun dev challenge, I was sure as an API it's useless and error-prone! Then when it was ready... I began to have doubts 🤔😄

The shorter notation might make sense in templates like in <$> or Recks...

Probably it's a maker's affection of some sort: when you're painting something for a day long, and you look at it -- and it's crap, you know it's crap but still you kinda like it 😔

I think, I'll finish and post the state thing.
Maybe even include it into proxify lib (bad naming here too 🤦‍♂️)
Still not sure what to do w/ autorun — will polish it, then we'll see...


BTW, I've been sharing all this on twitter too, here's a thread
Víctor Oliva made another cool autorun concept — check it out!
(I'd ping you long ago, though haven't found you there)

and here's latest concept w/ state & autorun from twitter:

state and autorun concepts


Thanks for taking your time to look into this on Friday evening 🙂
Have a good weekend!


SATURDAY: okay... I've shared autorun on github github.com/kosich/rxjs-autorun (no npm package, naming is still a problem)

autorun latest