DEV Community

Christopher Zhong
Christopher Zhong

Posted on

How to rewrite a promise-based function with loops to RxJS observables

It is relatively simple to convert a promise-based function to RxJS observables. Given the following function definition.

async function doSomething<T>(): Promise<T>;
Enter fullscreen mode Exit fullscreen mode

Simply, rename the doSomething function to something like doSomethingPromise. Then create a new function doSomething by simply wrapping the call to doSomethingPromise using the RxJS from function as shown in the following example.

async function doSomethingPromise<T>(): Promise<T>;

function doSomething<T>(): Observable<T> {
  return from(doSomethingPromise());
}
Enter fullscreen mode Exit fullscreen mode

However, what if you want to rewrite a promise-based function to RxJS observables? In simple cases, it is relatively straightforward but less so if it involves loops.

Context

It is easier to illustrate with an example. Suppose, you have a graph-like data structure (as shown below) and you want to merge the values such that the values in the "child" has higher-precedence.

interface Node {
  id: string;
  parentID?: string;
  values: KeyValue[];
}

interface KeyValue {
  key: string;
  value: string;
}
Enter fullscreen mode Exit fullscreen mode

Here is an example data.

const value3: Node = {
  id: 'id-1',
  values: [
    { key: 'key-1', value: 'value-1 from value3' },
    { key: 'key-2', value: 'value-2 from value3' },
    { key: 'key-3', value: 'value-3 from value3' },
    { key: 'key-4', value: 'value-4 from value3' },
  ],
};
const value2: Node = {
  id: 'id-2',
  parentID: value3.id,
  values: [
    { key: 'key-1', value: 'value-1 from value2' },
    { key: 'key-2', value: 'value-2 from value2' },
    { key: 'key-3', value: 'value-3 from value2' },
  ],
};
const value1: Node = {
  id: 'id-3',
  parentID: value2.id,
  values: [
    { key: 'key-1', value: 'value-1 from value1' },
    { key: 'key-2', value: 'value-2 from value1' },
  ],
};
Enter fullscreen mode Exit fullscreen mode

The merged values should be as follows.

const merged = [
  { key: 'key-1', value: 'value-1 from value1' },
  { key: 'key-2', value: 'value-2 from value1' },
  { key: 'key-3', value: 'value-3 from value2' },
  { key: 'key-4', value: 'value-4 from value3' },
];
Enter fullscreen mode Exit fullscreen mode

You may be thinking that the logic to merge the values can be written without it being asynchronous. But suppose that the data is retrieved from a database through an ORM library such as Prisma, in which the functions are asynchronous. The following is an example.

const prisma = new PrismaClient();
// the findUnique function returns a Promise<Node>
const node = await prisma.node.findUnique({
  include: { values: true },
  where: { id: 'id-3' },
});
Enter fullscreen mode Exit fullscreen mode

Then suppose the merge function is as follows.

async function merge(node: Node): Promise<Node> {
  let parent: Node | null = node;
  const valuesMap = new Map<string, string>();
  const values = [] as KeyValue[];
  while (parent != null) {
    parent.values.reduce(
      ({ valuesMap, values }, keyValue) => {
        if (!valuesMap.has(keyValue.key)) {
          valuesMap.set(keyValue.key, keyValue.value);
          values.push(keyValue);
        }
        return { valuesMap, values };
      },
      { valuesMap, values },
    );
    const parentID: string | undefined = parent.parentID;
    parent =
      parentID == null
        ? null
        : await prisma.node.findUnique({
            include: { values: true },
            where: { id: parentID },
          });
  }
  return { ...node, values };
}
Enter fullscreen mode Exit fullscreen mode

Since the findUnique function returns a Promise<Node>, it is necessary to use await, which means that the merge function has to be declared async.

Observables

How do I rewrite the merge function to use observables operators?

First, we create an observable from the node parameter so that we can start piping operators. The while logic can be replicated using the expand operator. It is necessary to use the from function for the findUnique because we should not be modifying the Prisma library. The observable is completed when parentID is null by returning empty (of()).

Then we use the filter operator to remove nulls. And extract the values using the map operator.

Next, we use the reduce operator to merge the values from each node. It is similar to the Array's reduce function.

Finally, the last map operator to return the node with the merged values. The following is a complete example.

function merge(node: Node): Observable<Node> {
  return of(node).pipe(
    expand((node) => {
      const parentID = node?.parentID;
      return parentID == null
        ? of()
        : from(
            prisma.node.findUnique({
              include: { values: true },
              where: { id: parentID },
            }),
          );
    }),
    filter(Boolean),
    map((node) => {
      return node.values;
    }),
    reduce(
      ({ valuesMap, values }, keyValues) => {
        keyValues.forEach((keyValue) => {
          if (!valuesMap.has(keyValue.key)) {
            valuesMap.set(keyValue.key, keyValue.value);
            values.push(keyValue);
          }
        });
        return { valuesMap, values };
      },
      {
        valuesMap: new Map<string, string>(),
        values: [] as KeyValue[],
      },
    ),
    map(({ values }) => {
      return { ...node, values };
    }),
  );
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

Figuring out how to use "loop"s with RxJS took me a while to figure out. Since I'm fairly new to the RxJS library, it wasn't obvious what was needed. I even tried asking ChatGPT to convert the function to use observables and the results were disastrous.

I hope that this post can provide a little clarity to those who are fairly new to RxJS.

Top comments (0)