DEV Community

Valery C. Briz
Valery C. Briz

Posted on • Originally published at Medium on

Reactive Programming in Python-Auth0

Why Reactive Programming?

In a way, reactive programming isn’t a new thing. Our typical click events are an asynchronous data stream which we can observe and trigger actions from it. That’s how it works, but Reactive Programming makes things so much easier by adding a toolbox of operators to filter, create, transform, and unify any of those streams. In just a few lines of maintainable code, we can have web sockets that receive multiple requests and handle them on an asynchronous process that serves a filtered output.

Web applications contain lots of database operations, network calls, nested callbacks, and other computationally expensive tasks that might take a long time to complete (or even block other threads until it’s done). This is where Reactive Programming enters, it gives us the facility to convert almost anything to streams (like variables, properties, user inputs, caches, etc) to manage it asynchronously. Besides that, it also gives us an easy way to handle errors. A task that is usually hard task within asynchronous programming. Reactive Programming makes our code more flexible, readable, maintainable, and easy to write.

What Does Reactive Programming Really Means?

The main difference between Event-Driven programming and Reactive Programming is the real trigger of the action. While the Event-Driven programming focuses on handling any event (such as a button click) to trigger the corresponding action, Reactive Programming wraps data into the reactive system as events. This enables us to do things like listening to user inputs as events that trigger actions only if the input changed from the previous one.

“Reactive, in Reactive Programming, means a dynamic reaction to change in streams.”

Reactive Programming is a programming paradigm oriented around data flows and the propagation of change. This means that, when a data flow is emitted by one component, the Reactive Programming library will automatically propagate those changes to other components until it reaches the final receiver.

How Does Reactive Programming Works?

Let’s take into consideration ReactiveX, the most famous implementation of the Reactive Programming paradigm. ReactiveX is mainly based on two classes: the Observable and Observer classes. The Observable class is the source of data streams or events and the Observer class is the one that consumes (or reacts to) the emitted elements.

Observables in Reactive Programming

An Observable packs the incoming data so it can be passed from one thread to another. The Observable can be configured so it regulates when to supply data. For example, it could be triggered periodically or only once in their life cycle. There are also various functions that can be used to filter or transform the observable so the observer only emits certain data. All this is used instead of callbacks, which means that our code becomes more readable and less fallible.

from rx import Observable, Observer 

source = Observable.from_list([1,2,3,4,5,6])

It’s common to use Observables in a way that it doesn’t give data until some Observer subscribes to it. Known as "call by need", this is an evaluation strategy which delays the evaluation of an event until its value is needed.

Observers in Reactive Programming

The Observer consumes the data stream emitted by the Observable. An Observable can have multiple Observers so each data item emitted will be received by each Observer. The "listening" to the stream is called subscribing. Observers subscribe to the Observable with the subscribe() method to receive the emitted data.

The Observer can receive three types of events:

  • on_next(): when there is an element in the data stream;
  • on_completed(): when no more items are coming, it implies end of emission.
  • on_error(): when there is an error thrown from the Observable (it also implies end of emission);
class PrintObserver(Observer): 
    def on_next(self, value): 
        print("Received {0}".format(value)) 
    def on_completed(self):     
    def on_error(self, error): 
        print("Error Occurred: {0}".format(error)) 


We don’t have to specify all three event types in the code. We can choose which events to observe using the named arguments, or simply providing a lambda for the on_next function. Typically, in production, we will want to provide an on_error handler so errors are explicitly handled by the subscriber.

source = Observable.from_list([1,2,3,4,5,6]) 
source.subscribe(lambda value: print("Received {0}".format(value)))

Subjects in Reactive Programming

The Subject is the Observable extension that simultaneously implements the Observer interface. That is, Subjects acts like both Observers and Observables. They receive messages about events (like Observers) and notify their subscribers (like Observables). This implies two things:

  • We can subscribe to a Subject, just like an Observ­able.
  • A Sub­ject can sub­scribe to other Observables.

Therefore, the main dif­fer­ence between a Sub­ject and an Observable is that all of the Sub­ject subscribers share the same action. This means that, when a Sub­ject pro­duces data, all of its sub­scribers will receive the same data. This is unlike Observ­ables, where each sub­scrip­tion causes an inde­pen­dent exe­cu­tion of the observable.

A Subscriber can not only subscribe to an Observable but also unsubscribe from it. It is important to remember to unsubscribe from the asynchronous calls. When calling unsubscribe(), all the operators unsubscribe from one another in sequence from top to bottom. So we can avoid memory leaks. In the case of the Subjects, we will use the dispose method which can be thought of as the subscription itself, or perhaps a token representing the subscription. Disposing it will dispose the subscription and also unsubscribe. The unsubscribe call can be placed in the socket on_close() method:

def on_close(self): 
    print("WebSocket closed")

Continue reading this Article at

Originally published at

Top comments (0)