DEV Community

Cover image for Async pipe is not pure 🤯
Enea Jahollari for This is Angular

Posted on • Originally published at eneajahollari.Medium

Async pipe is not pure 🤯

Yes, that’s normal! Why? Because of how Angular does change detection under the hood!

To better understand how the async pipe works, let’s create our own from scratch!

Let’s call it SubscribePipe in order for to differ it from the original one.

Also, we want our pipe to check this points:

  • Work with Observables (we won’t handle Promises in this post)
  • No over-subscribing
  • Work with OnPush change detection
  • No memory leaks

And use it like this, ex:

@Component({
  selector: 'my-app',
  template: `
    <div *ngIf="show">
      Value: {{ obs$ | subscribe }}
    </div>
  `,
  standalone: true,
  imports: [CommonModule, SubscribePipe],
  // changeDetection: ChangeDetectionStrategy.OnPush // <-- after we handle it we should uncomment this
})
export class AppComponent {
  show = true;

  obs$ = interval(500).pipe(
    tap((x) => {
      if (x === 10) {
        this.show = false;
      }
    })
  );
}
Enter fullscreen mode Exit fullscreen mode

Create a pipe (standalone)

import { Pipe, PipeTransform } from '@angular/core';

@Pipe({
  name: 'subscribe',
  standalone: true
})
export class SubscribePipe implements PipeTransform {
  transform() {}
}
Enter fullscreen mode Exit fullscreen mode

We want our pipe to accept an Observable type, that means we need to handle Observables, Subjects, BehaviorSubjects and ReplaySubjects.

Let’s create a Subscribable type that includes all of them (that will be a generic type).

type Subscribable<T> = Observable<T> | Subject<T> | BehaviorSubject<T> | ReplaySubject<T>;
Enter fullscreen mode Exit fullscreen mode

Now that we know what our pipe accepts, let’s refactor the pipe to use it! Because we want to infer the type of the subscribable we will convert the pipe class to be generic.

Also, one other thing, we can also pass undefined or null to our pipe.

export class SubscribePipe<T> implements PipeTransform {
  transform(obs: Subscribable<T> | null | undefined) {}
}
Enter fullscreen mode Exit fullscreen mode

Subscription handling

First we need to check if the observable is not null or undefined, and if it is we just return null;

transform(obs: Subscribable<T> | null): T | null {
  if (!obs) {
    return null;
  }
}
Enter fullscreen mode Exit fullscreen mode

Now, let’s subscribe to the observable and store it’s last value and return it directly.

latestValue: T | null = null;

transform(obs: Subscribable<T> | null): T | null {
  if (!obs) {
    return null;
  }

  obs.subscribe(value => {
    this.latestValue = value;
  });

  return this.latestValue;
}
Enter fullscreen mode Exit fullscreen mode

This won’t work! Why?

Because when change detection runs, the pipe will check the parameters in the transform method and if they haven’t changed, it will return the last value that it had cached before.

I’ve explained it more deeply in my previous post: It’s ok to use function calls in Angular templates! , where I explain how pipes memoization works and how we can do the same thing too when using normal functions.

This is the moment where we opt-out of pipe memoization by using the pure: false flag.

@Pipe({
  name: 'subscribe',
  standalone: true,
  pure: false // <-- It is true by default
})
Enter fullscreen mode Exit fullscreen mode

The moment we set the pure option to be false, we tell Angular that we want to handle the transform method memoization by ourself.

If we run the code we will see something like this:

Not working solution

The reason why it does that is because every time change detection runs, the pipe will subscribe to our observable and let the other subscription in memory, and of course create a memory leak!

How can we fix that? By doing a simple equality check!

We will save the current observable reference, and every time the transform method is called, we will check if it’s equal with our current one, and if that’s true, we will just return that latest value.

private currentObs: Subscribable<T> | null = null;

transform(obs: Subscribable<T> | null): T | null {
  if (!obs) {
    return null;
  }

  if (obs === this.currentObs) { // <-- simple equality check
    return this.latestValue;
  } else {
    this.currentObs = obs; // <-- save current observable to a class field

    obs.subscribe((value) => {
      this.latestValue = value;
    });
  }

  return this.latestValue;
}
Enter fullscreen mode Exit fullscreen mode

If we check the app now, we will see that it works fine! But it’s not finished!

Working solution

Because we still cause a memory leak, because we never unsubscribe from the observable! Let’s do it!

Unsubscription handling

Just like we stored the latestValue and currentObs, we will also store the current subscription and assign the observable subscription to it!

private sub: Subscription | null = null;

transform(obs: Subscribable<T> | null): T | null {
  ...
  this.sub = obs.subscribe((value) => {
    this.latestValue = value;
  });
  ...
}
Enter fullscreen mode Exit fullscreen mode

Good! Now we need to unsubscribe on ngOnDestroy, but not only there 💡. How so? Because we need to unsubscribe also on cases where we change the observable reference to be another observable or set it to null.

For example:

@Component({
  selector: 'my-app',
  template: `
    <div *ngIf="show">{{ obs$ | subscribe }}</div>
  `,
  standalone: true,
  imports: [CommonModule, SubscribePipe],
})
export class AppComponent {
  show = true;

  ngOnInit() {
    setTimeout(() => {
      this.obs$ = of(20000);
    }, 2000);

    setTimeout(() => {
      this.obs$ = null;
    }, 4000);
  }

  obs$ = interval(500).pipe(
    tap((x) => {
      if (x === 10) {
        this.show = false;
      }
    })
  );
}
Enter fullscreen mode Exit fullscreen mode

So, we need to dispose the subscription also on these cases! Otherwise, we cause a memory leak!

Let’s create a dispose() method that has the unsubscription logic in order to re-use it!

private dispose() {
  if (this.sub) { // <-- first we check if we have a subscription
    this.sub.unsubscribe(); // <-- unsubscribe from the observable
    this.sub = null; // <-- remove the subscription reference
  }
}
Enter fullscreen mode Exit fullscreen mode

Now, let’s use this method!

First, on ngOnDestroy() and then in the other cases mentioned above!

ngOnDestroy() {
  this.dispose();
}
Enter fullscreen mode Exit fullscreen mode
transform(obs: Subscribable<T> | null): T | null {
  if (!obs) {
    this.dispose(); // <-- if we have a current sub and change the obs to be null we need to dispose it
    return null;
  }

  if (obs === this.currentObs) {
    return this.latestValue;
  } else {
    this.dispose(); // <-- before subscribing to a new observable, we need to dispose the existing one

    this.currentObs = obs;

    this.sub = obs.subscribe((value) => {
      this.latestValue = value;
    });
  }

  return this.latestValue;
}
Enter fullscreen mode Exit fullscreen mode

If we see the app, it will still work without any issue! And now without memory leaks 🎉 yay!

Are we done? Not yet, because our pipe doesn’t work with OnPush ChangeDetection!

On, your editor, try to enable the changeDetection OnPush on your component and see the app! It won’t show anything!

But, how can we fix it? Just like any other time when we try to fix change detection issues 😈😄, put a cdr.markForCheck() after you update the value and we’re done!

private cdr = inject(ChangeDetectorRef); // <-- inject CDRef here

transform(obs: Subscribable<T> | null): T | null {
  ...
  this.sub = obs.subscribe((value) => {
    this.latestValue = value;
    this.cdr.markForCheck(); // <-- mark the component as dirty here, after we have updated the latestValue
  });
  ...
}
Enter fullscreen mode Exit fullscreen mode

That’s it!

Code refactoring

Let’s move the subscription handling in a method (and also throw when we get an error), set cdr, currentObs to null in ngOnDestroy (to remove it’s reference and not cause any memory leak) and change the way code is handled in the transform method for better readability.

private subscribe(obs: Subscribable<T>) {
  this.currentObs = obs;

  this.sub = obs.subscribe({
    next: (res) => {
      this.latestValue = res;
      this.cdr.markForCheck();
    },
    error: (error) => {
      throw error;
    },
  });
}
Enter fullscreen mode Exit fullscreen mode
transform(obs: Subscribable<T> | null): T | null {
  if (!obs) {
    this.dispose();
    return null;
  }

  // here we check if the obs are not the same instead of checking if they are the same
  if (obs !== this.currentObs) {
    this.dispose();
    this.subscribe(obs); // <-- use the method we extracted above
  }

  return this.latestValue;
}
Enter fullscreen mode Exit fullscreen mode
ngOnDestroy() {
  this.dispose();
  this.cdr = null;
  this.currentObs = null;
}
Enter fullscreen mode Exit fullscreen mode

Recap

Find the whole source code here.

We have handled almost all the cases that Angular’s async pipe handles except the promises (but also that can be done easily)! And understood why Angular async pipe is not pure, and why that is not an issue at all!

I hope you liked this post and learned something from it!

--

I tweet a lot about Angular (latest news, videos, podcasts, updates, RFCs, pull requests and so much more). If you’re interested about it, give me a follow at @Enea_Jahollari. Give me a follow on Dev.to if you liked this article and want to see more like this!

Thank you for reading!

Top comments (4)

Collapse
 
chan_austria777 profile image
chan 🤖

Awesome writeup.

One question, is obs.subscribe() an asynchronous or synchronous method?
If it is asynchronous method, wouldn't it return immediately the return value before the latest emitted value assigns the new value to this.latestValue.

Collapse
 
eneajaho profile image
Enea Jahollari

Hello, good question!

In fact, obs.subscribe() depends on the observable itself.

So, if the observable is something like: of('Hello world!'), it will emit synchronously and the value we get is "Hello world" directly.
But, if the observable emits in asynchronous way, for example an http call, it will show the first value that is null, just like async pipe that returns null by default.

Collapse
 
chan_austria777 profile image
chan 🤖

Got that. There's one thing that isn't clear to me yet.

If the observable reference that is passed to subscribe pipe doesn't change and that observable emits a new value, it will get assigned to this.latestValue because of obs.subscribe() inside the subscribe pipe but it has no return statement.

The question is how is the latest emitted value from the observable returned from subscribe pipe in the event the observable is not changed. I know it simply returns whatever is in this.latestValue, but what if no change detection runs that time.
I know it uses markForCheck but it was added specifically for onPush statement, and it worked for the default changing strategy.

In summary, i'm thinking of a situation where no new Observable reference (but emits a new value) + no change detection runs, how does the custom subscribe pipe return the latest value?

Hopefully, i was able to convey clearly where my confusion is. Thanks in advance.

Thread Thread
 
eneajaho profile image
Enea Jahollari

Image description

The pipe transform method will be called on every change detection, but the setting of latestValue will be made everytime the observable emits, and that has nothing to do with the change detection.

The moment the change detection runs, and the observable hasn't changed its reference, we just return the latestValue or the updated latestValue value.

Hope it's more clear.