DEV Community

Salad Lam
Salad Lam

Posted on

Project Reactor: About Fuseable interface SYNC mode

About reactor.core.Fuseable interface

Main reason of introduces Fuseable interface is to speed up data flow in operator chain. Here is the discussion on that.

Example SYNC mode implementation

Below is an example to display the signal received by implements both Fuseable SYNC mode and traditional Reactive Streams's request and onNext cycle. Project Reactor 3.6.3 is used.

package example;

import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;

import java.util.Objects;

public class SyncDisplaySubscriber<T> implements CoreSubscriber<T> {

    private static final Logger LOGGER = LoggerFactory.getLogger(SyncDisplaySubscriber.class);
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        LOGGER.info("onSubscribe(): s={}", s);
        subscription = s;
        if (s instanceof Fuseable.QueueSubscription) { // (1)
            @SuppressWarnings("unchecked")
            Fuseable.QueueSubscription<T> qs = (Fuseable.QueueSubscription<T>) s;
            int mode = qs.requestFusion(Fuseable.SYNC); // (2)
            LOGGER.info("onSubscribe(): requestFusion()={}", mode);
            if (mode == Fuseable.SYNC) { // (3)
                drain();
                return;
            }
        }
        s.request(1L);
    }

    @Override
    public void onNext(T t) {
        LOGGER.info("onNext(): t={}", t);
        subscription.request(1L);
    }

    @Override
    public void onError(Throwable t) {
        LOGGER.info("onError()", t);
    }

    @Override
    public void onComplete() {
        LOGGER.info("onComplete()");
    }

    private void drain() {
        @SuppressWarnings("unchecked")
        Fuseable.QueueSubscription<T> qs = (Fuseable.QueueSubscription<T>) subscription;
        try {
            T next;
            do {
                next = qs.poll(); // (4)
                if (Objects.nonNull(next)) {
                    LOGGER.info("drain(): next={}", next); // (5)
                } else {
                    LOGGER.info("drain(): complete"); // (6)
                }
            } while (Objects.nonNull(next));
        } catch (Exception t) {
            LOGGER.info("drain(): error", t); // (7)
        }
    }

}
Enter fullscreen mode Exit fullscreen mode

(1): the Subscription from upstream operator must implement Fuseable.QueueSubscription, otherwise Fuseable is not supported.

(2): Using Fuseable.QueueSubscription.requestFusion(Fuseable.SYNC) method to check if SYNC mode is supported or not.

(3): SYNC mode is supported only if the return is Fuseable.SYNC. In SYNC mode it is not necessarily to call Subscription.request(long). If Fuseable.NONE is return, the subscriber will get data by Reactive Streams's request and onNext cycle described by here.

(4): To get a value by calling Fuseable.QueueSubscription.poll(). SYNC mode means that triggers release a value from source.

(5): If the result is not null, it means that onNext signal is received with that value.

(6): If the result is null, it means that the onComplete signal is received.

(7): May get an exception when calling Fuseable.QueueSubscription.poll(). It means that onError signal is received with that exception.

Another class DisplaySubscriber is nearly the same as SyncDisplaySubscriber, except it implements org.reactivestreams.Subscriber interface rather than
reactor.core.CoreSubscriber interface.

// Implementation of this class is equal to SyncDisplaySubscriber except implements org.reactivestreams.Subscriber interface
public class DisplaySubscriber<T> implements Subscriber<T> {
    // ...
}
Enter fullscreen mode Exit fullscreen mode

Method Flux.just() returns a FluxArray instance, and its Subscription class ArraySubscription supports Fuseable.SYNC mode. We use this operator to observes the data flow of subscribers.

package example;

import reactor.core.publisher.Flux;

public class FuseableSyncModeTest {

    public static void main(String[] args) {
        Flux<Integer> flux1 = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux1.subscribe(new SyncDisplaySubscriber<>());

        Flux<Integer> flux2 = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux2.subscribe(new DisplaySubscriber<>());
    }

}
Enter fullscreen mode Exit fullscreen mode

The output is

10:55:23.904 [main] INFO  example.SyncDisplaySubscriber -- onSubscribe(): s=reactor.core.publisher.FluxArray$ArraySubscription@245b4bdc
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- onSubscribe(): requestFusion()=1
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): next=1
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): next=2
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): next=3
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): next=4
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): next=5
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): next=6
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): next=7
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): next=8
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): next=9
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): next=10
10:55:23.905 [main] INFO  example.SyncDisplaySubscriber -- drain(): complete
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onSubscribe(): s=reactor.core.publisher.StrictSubscriber@3590fc5b
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onNext(): t=1
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onNext(): t=2
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onNext(): t=3
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onNext(): t=4
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onNext(): t=5
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onNext(): t=6
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onNext(): t=7
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onNext(): t=8
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onNext(): t=9
10:55:23.917 [main] INFO  example.DisplaySubscriber -- onNext(): t=10
10:55:23.920 [main] INFO  example.DisplaySubscriber -- onComplete()
Enter fullscreen mode Exit fullscreen mode

Please note that signal sent from source and received on subscriber is under the same thread. And if the subscriber contains org.reactivestreams.Subscriber only, it means that the data flow should stick on Reactive Streams's request and onNext cycle.

Top comments (0)